2 * Copyright (c) 2015, 2017 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.awaitility.Awaitility.await;
11 import static org.hamcrest.Matchers.equalTo;
12 import static org.junit.Assert.assertEquals;
13 import static org.junit.Assert.assertFalse;
14 import static org.junit.Assert.assertNotNull;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.ArgumentMatchers.any;
18 import static org.mockito.ArgumentMatchers.eq;
19 import static org.mockito.Mockito.timeout;
20 import static org.mockito.Mockito.verify;
22 import akka.actor.ActorRef;
23 import akka.actor.ActorSelection;
24 import akka.actor.ActorSystem;
25 import akka.actor.Address;
26 import akka.actor.AddressFromURIString;
27 import akka.cluster.Cluster;
28 import akka.cluster.Member;
29 import akka.dispatch.Futures;
30 import akka.pattern.Patterns;
31 import akka.testkit.javadsl.TestKit;
32 import com.google.common.base.Stopwatch;
33 import com.google.common.base.Throwables;
34 import com.google.common.collect.ImmutableMap;
35 import com.google.common.collect.Range;
36 import com.google.common.primitives.UnsignedLong;
37 import com.google.common.util.concurrent.ListenableFuture;
38 import com.google.common.util.concurrent.MoreExecutors;
39 import com.google.common.util.concurrent.Uninterruptibles;
40 import com.typesafe.config.ConfigFactory;
41 import java.util.Arrays;
42 import java.util.Collection;
43 import java.util.Collections;
44 import java.util.Iterator;
45 import java.util.LinkedList;
46 import java.util.List;
47 import java.util.Optional;
49 import java.util.concurrent.ExecutionException;
50 import java.util.concurrent.ExecutorService;
51 import java.util.concurrent.Executors;
52 import java.util.concurrent.TimeUnit;
53 import java.util.concurrent.atomic.AtomicBoolean;
54 import java.util.concurrent.atomic.AtomicLong;
55 import java.util.function.Supplier;
56 import org.junit.After;
57 import org.junit.Assume;
58 import org.junit.Before;
59 import org.junit.Ignore;
60 import org.junit.Test;
61 import org.junit.runner.RunWith;
62 import org.junit.runners.Parameterized;
63 import org.junit.runners.Parameterized.Parameter;
64 import org.junit.runners.Parameterized.Parameters;
65 import org.mockito.Mockito;
66 import org.mockito.stubbing.Answer;
67 import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
68 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
69 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
70 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
71 import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
72 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
73 import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
74 import org.opendaylight.controller.cluster.datastore.TestShard.StartDropMessages;
75 import org.opendaylight.controller.cluster.datastore.TestShard.StopDropMessages;
76 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
77 import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
78 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
79 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
80 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
81 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
82 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
83 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
84 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
85 import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata;
86 import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
87 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
88 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
89 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
90 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
91 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
92 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
93 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
94 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
95 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
96 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
97 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
98 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
99 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
100 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
101 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
102 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
103 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
104 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
105 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
106 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
107 import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
108 import org.opendaylight.mdsal.dom.spi.store.DOMStore;
109 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
110 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
111 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
112 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
113 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
114 import org.opendaylight.yangtools.yang.common.Uint64;
115 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
116 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
117 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
118 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
119 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
120 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
121 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
122 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
123 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
124 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
125 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
126 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
127 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
128 import scala.concurrent.Await;
129 import scala.concurrent.Future;
130 import scala.concurrent.duration.FiniteDuration;
133 * End-to-end distributed data store tests that exercise remote shards and transactions.
135 * @author Thomas Pantelis
137 @RunWith(Parameterized.class)
138 public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
140 @Parameters(name = "{0}")
141 public static Collection<Object[]> data() {
142 return Arrays.asList(new Object[][] {
143 { TestDistributedDataStore.class, 7}, { TestClientBackedDataStore.class, 12 }
148 public Class<? extends AbstractDataStore> testParameter;
150 public int commitTimeout;
152 private static final String[] CARS_AND_PEOPLE = {"cars", "people"};
153 private static final String[] CARS = {"cars"};
155 private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse(
156 "akka://cluster-test@127.0.0.1:2558");
157 private static final Address MEMBER_2_ADDRESS = AddressFromURIString.parse(
158 "akka://cluster-test@127.0.0.1:2559");
160 private static final String MODULE_SHARDS_CARS_ONLY_1_2 = "module-shards-cars-member-1-and-2.conf";
161 private static final String MODULE_SHARDS_CARS_PEOPLE_1_2 = "module-shards-member1-and-2.conf";
162 private static final String MODULE_SHARDS_CARS_PEOPLE_1_2_3 = "module-shards-member1-and-2-and-3.conf";
163 private static final String MODULE_SHARDS_CARS_1_2_3 = "module-shards-cars-member-1-and-2-and-3.conf";
165 private ActorSystem leaderSystem;
166 private ActorSystem followerSystem;
167 private ActorSystem follower2System;
169 private final DatastoreContext.Builder leaderDatastoreContextBuilder =
170 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
172 private final DatastoreContext.Builder followerDatastoreContextBuilder =
173 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
174 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
175 private final TransactionIdentifier tx1 = nextTransactionId();
176 private final TransactionIdentifier tx2 = nextTransactionId();
178 private AbstractDataStore followerDistributedDataStore;
179 private AbstractDataStore leaderDistributedDataStore;
180 private IntegrationTestKit followerTestKit;
181 private IntegrationTestKit leaderTestKit;
184 public void setUp() {
185 InMemoryJournal.clear();
186 InMemorySnapshotStore.clear();
188 leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
189 Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
191 followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
192 Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
194 follower2System = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member3"));
195 Cluster.get(follower2System).join(MEMBER_1_ADDRESS);
199 public void tearDown() {
200 if (followerDistributedDataStore != null) {
201 leaderDistributedDataStore.close();
203 if (leaderDistributedDataStore != null) {
204 leaderDistributedDataStore.close();
207 TestKit.shutdownActorSystem(leaderSystem);
208 TestKit.shutdownActorSystem(followerSystem);
209 TestKit.shutdownActorSystem(follower2System);
211 InMemoryJournal.clear();
212 InMemorySnapshotStore.clear();
215 private void initDatastoresWithCars(final String type) throws Exception {
216 initDatastores(type, MODULE_SHARDS_CARS_ONLY_1_2, CARS);
219 private void initDatastoresWithCarsAndPeople(final String type) throws Exception {
220 initDatastores(type, MODULE_SHARDS_CARS_PEOPLE_1_2, CARS_AND_PEOPLE);
223 private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards)
225 initDatastores(type, moduleShardsConfig, shards, leaderDatastoreContextBuilder,
226 followerDatastoreContextBuilder);
229 private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards,
230 DatastoreContext.Builder leaderBuilder, DatastoreContext.Builder followerBuilder) throws Exception {
231 leaderTestKit = new IntegrationTestKit(leaderSystem, leaderBuilder, commitTimeout);
233 leaderDistributedDataStore = leaderTestKit.setupAbstractDataStore(
234 testParameter, type, moduleShardsConfig, false, shards);
236 followerTestKit = new IntegrationTestKit(followerSystem, followerBuilder, commitTimeout);
237 followerDistributedDataStore = followerTestKit.setupAbstractDataStore(
238 testParameter, type, moduleShardsConfig, false, shards);
240 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorUtils(), shards);
242 leaderTestKit.waitForMembersUp("member-2");
243 followerTestKit.waitForMembersUp("member-1");
246 private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries)
248 final Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
249 assertTrue("isPresent", optional.isPresent());
251 final CollectionNodeBuilder<MapEntryNode, MapNode> listBuilder = ImmutableNodes.mapNodeBuilder(
252 CarsModel.CAR_QNAME);
253 for (final NormalizedNode<?, ?> entry: entries) {
254 listBuilder.withChild((MapEntryNode) entry);
257 assertEquals("Car list node", listBuilder.build(), optional.get());
260 private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path,
261 final NormalizedNode<?, ?> expNode) throws Exception {
262 final Optional<NormalizedNode<?, ?>> optional = readTx.read(path).get(5, TimeUnit.SECONDS);
263 assertTrue("isPresent", optional.isPresent());
264 assertEquals("Data node", expNode, optional.get());
267 private static void verifyExists(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path)
269 final Boolean exists = readTx.exists(path).get(5, TimeUnit.SECONDS);
270 assertEquals("exists", Boolean.TRUE, exists);
274 public void testWriteTransactionWithSingleShard() throws Exception {
275 final String testName = "testWriteTransactionWithSingleShard";
276 initDatastoresWithCars(testName);
278 final String followerCarShardName = "member-2-shard-cars-" + testName;
280 DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
281 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
283 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
284 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
286 final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
287 final YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
288 writeTx.merge(car1Path, car1);
290 final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000));
291 final YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
292 writeTx.merge(car2Path, car2);
294 followerTestKit.doCommit(writeTx.ready());
296 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1, car2);
298 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
302 writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
304 writeTx.delete(car1Path);
306 followerTestKit.doCommit(writeTx.ready());
308 verifyExists(followerDistributedDataStore.newReadOnlyTransaction(), car2Path);
310 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
312 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car2);
314 // Re-instate the follower member 2 as a single-node to verify replication and recovery.
316 // The following is a bit tricky. Before we reinstate the follower we need to ensure it has persisted and
317 // applied and all the log entries from the leader. Since we've verified the car data above we know that
318 // all the transactions have been applied on the leader so we first read and capture its lastAppliedIndex.
319 final AtomicLong leaderLastAppliedIndex = new AtomicLong();
320 IntegrationTestKit.verifyShardState(leaderDistributedDataStore, CARS[0],
321 state -> leaderLastAppliedIndex.set(state.getLastApplied()));
323 // Now we need to make sure the follower has persisted the leader's lastAppliedIndex via ApplyJournalEntries.
324 // However we don't know exactly how many ApplyJournalEntries messages there will be as it can differ between
325 // the tell-based and ask-based front-ends. For ask-based there will be exactly 2 ApplyJournalEntries but
326 // tell-based persists additional payloads which could be replicated and applied in a batch resulting in
327 // either 2 or 3 ApplyJournalEntries. To handle this we read the follower's persisted ApplyJournalEntries
328 // until we find the one that encompasses the leader's lastAppliedIndex.
329 Stopwatch sw = Stopwatch.createStarted();
330 boolean done = false;
332 final List<ApplyJournalEntries> entries = InMemoryJournal.get(followerCarShardName,
333 ApplyJournalEntries.class);
334 for (ApplyJournalEntries aje: entries) {
335 if (aje.getToIndex() >= leaderLastAppliedIndex.get()) {
341 assertTrue("Follower did not persist ApplyJournalEntries containing leader's lastAppliedIndex "
342 + leaderLastAppliedIndex + ". Entries persisted: " + entries, sw.elapsed(TimeUnit.SECONDS) <= 5);
344 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
347 TestKit.shutdownActorSystem(leaderSystem, true);
348 TestKit.shutdownActorSystem(followerSystem, true);
350 final ActorSystem newSystem = newActorSystem("reinstated-member2", "Member2");
352 try (AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder,
354 .setupAbstractDataStore(testParameter, testName, "module-shards-member2", true, CARS)) {
355 verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
360 public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
361 final String testName = "testWriteTransactionWithSingleShard";
362 initDatastoresWithCars(testName);
364 final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
366 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
367 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
368 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
369 followerTestKit.doCommit(writeTx.ready());
372 for (int i = 0; i < numCars; i++) {
373 writeTx = txChain.newWriteOnlyTransaction();
374 writeTx.write(CarsModel.newCarPath("car" + i),
375 CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000)));
377 followerTestKit.doCommit(writeTx.ready());
379 DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
380 domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
382 domStoreReadTransaction.close();
385 // wait to let the shard catch up with purged
386 await("Range set leak test").atMost(5, TimeUnit.SECONDS)
387 .pollInterval(500, TimeUnit.MILLISECONDS)
388 .untilAsserted(() -> {
389 Optional<ActorRef> localShard =
390 leaderDistributedDataStore.getActorUtils().findLocalShard("cars");
391 FrontendShardDataTreeSnapshotMetadata frontendMetadata =
392 (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
393 .executeOperation(localShard.get(), new RequestFrontendMetadata());
395 if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
396 Iterator<FrontendHistoryMetadata> iterator =
397 frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
398 FrontendHistoryMetadata metadata = iterator.next();
399 while (iterator.hasNext() && metadata.getHistoryId() != 1) {
400 metadata = iterator.next();
403 assertEquals(0, metadata.getClosedTransactions().size());
404 assertEquals(Range.closedOpen(UnsignedLong.valueOf(0), UnsignedLong.valueOf(11)),
405 metadata.getPurgedTransactions().asRanges().iterator().next());
407 // ask based should track no metadata
408 assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
412 final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
413 .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
414 assertTrue("isPresent", optional.isPresent());
415 assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
419 @Ignore("Flushes out tell based leak needs to be handled separately")
420 public void testCloseTransactionMetadataLeak() throws Exception {
421 // Ask based frontend seems to have some issues with back to back close
422 Assume.assumeTrue(testParameter.isAssignableFrom(TestClientBackedDataStore.class));
424 final String testName = "testWriteTransactionWithSingleShard";
425 initDatastoresWithCars(testName);
427 final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
429 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
430 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
431 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
432 followerTestKit.doCommit(writeTx.ready());
435 for (int i = 0; i < numCars; i++) {
436 writeTx = txChain.newWriteOnlyTransaction();
439 DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
440 domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
442 domStoreReadTransaction.close();
445 writeTx = txChain.newWriteOnlyTransaction();
446 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
447 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
448 followerTestKit.doCommit(writeTx.ready());
450 // wait to let the shard catch up with purged
451 await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS)
452 .pollInterval(500, TimeUnit.MILLISECONDS)
453 .untilAsserted(() -> {
454 Optional<ActorRef> localShard =
455 leaderDistributedDataStore.getActorUtils().findLocalShard("cars");
456 FrontendShardDataTreeSnapshotMetadata frontendMetadata =
457 (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
458 .executeOperation(localShard.get(), new RequestFrontendMetadata());
460 if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
461 Iterator<FrontendHistoryMetadata> iterator =
462 frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
463 FrontendHistoryMetadata metadata = iterator.next();
464 while (iterator.hasNext() && metadata.getHistoryId() != 1) {
465 metadata = iterator.next();
468 Set<Range<UnsignedLong>> ranges = metadata.getPurgedTransactions().asRanges();
470 assertEquals(0, metadata.getClosedTransactions().size());
471 assertEquals(1, ranges.size());
473 // ask based should track no metadata
474 assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
478 final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
479 .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
480 assertTrue("isPresent", optional.isPresent());
481 assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
485 public void testReadWriteTransactionWithSingleShard() throws Exception {
486 initDatastoresWithCars("testReadWriteTransactionWithSingleShard");
488 final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
489 assertNotNull("newReadWriteTransaction returned null", rwTx);
491 rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
492 rwTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
494 final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
495 rwTx.merge(CarsModel.newCarPath("optima"), car1);
497 verifyCars(rwTx, car1);
499 final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000));
500 final YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
501 rwTx.merge(car2Path, car2);
503 verifyExists(rwTx, car2Path);
505 followerTestKit.doCommit(rwTx.ready());
507 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1, car2);
511 public void testWriteTransactionWithMultipleShards() throws Exception {
512 initDatastoresWithCarsAndPeople("testWriteTransactionWithMultipleShards");
514 final DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
515 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
517 final YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
518 final NormalizedNode<?, ?> carsNode = CarsModel.emptyContainer();
519 writeTx.write(carsPath, carsNode);
521 final YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
522 final NormalizedNode<?, ?> peopleNode = PeopleModel.emptyContainer();
523 writeTx.write(peoplePath, peopleNode);
525 followerTestKit.doCommit(writeTx.ready());
527 final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
529 verifyNode(readTx, carsPath, carsNode);
530 verifyNode(readTx, peoplePath, peopleNode);
534 public void testReadWriteTransactionWithMultipleShards() throws Exception {
535 initDatastoresWithCarsAndPeople("testReadWriteTransactionWithMultipleShards");
537 final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
538 assertNotNull("newReadWriteTransaction returned null", rwTx);
540 final YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
541 final NormalizedNode<?, ?> carsNode = CarsModel.emptyContainer();
542 rwTx.write(carsPath, carsNode);
544 final YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
545 final NormalizedNode<?, ?> peopleNode = PeopleModel.emptyContainer();
546 rwTx.write(peoplePath, peopleNode);
548 followerTestKit.doCommit(rwTx.ready());
550 final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
552 verifyNode(readTx, carsPath, carsNode);
553 verifyNode(readTx, peoplePath, peopleNode);
557 public void testTransactionChainWithSingleShard() throws Exception {
558 initDatastoresWithCars("testTransactionChainWithSingleShard");
560 final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
562 // Add the top-level cars container with write-only.
564 final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
565 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
567 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
569 final DOMStoreThreePhaseCommitCohort writeTxReady = writeTx.ready();
571 // Verify the top-level cars container with read-only.
573 verifyNode(txChain.newReadOnlyTransaction(), CarsModel.BASE_PATH, CarsModel.emptyContainer());
575 // Perform car operations with read-write.
577 final DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
579 verifyNode(rwTx, CarsModel.BASE_PATH, CarsModel.emptyContainer());
581 rwTx.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
583 final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
584 final YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
585 rwTx.write(car1Path, car1);
587 verifyExists(rwTx, car1Path);
589 verifyCars(rwTx, car1);
591 final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000));
592 rwTx.merge(CarsModel.newCarPath("sportage"), car2);
594 rwTx.delete(car1Path);
596 followerTestKit.doCommit(writeTxReady);
598 followerTestKit.doCommit(rwTx.ready());
602 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
606 public void testTransactionChainWithMultipleShards() throws Exception {
607 initDatastoresWithCarsAndPeople("testTransactionChainWithMultipleShards");
609 final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
611 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
612 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
614 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
615 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
617 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
618 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
620 followerTestKit.doCommit(writeTx.ready());
622 final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
624 final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
625 final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
626 readWriteTx.write(carPath, car);
628 final MapEntryNode person = PeopleModel.newPersonEntry("jack");
629 final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
630 readWriteTx.merge(personPath, person);
632 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
633 assertTrue("isPresent", optional.isPresent());
634 assertEquals("Data node", car, optional.get());
636 optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
637 assertTrue("isPresent", optional.isPresent());
638 assertEquals("Data node", person, optional.get());
640 final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
642 writeTx = txChain.newWriteOnlyTransaction();
644 writeTx.delete(personPath);
646 final DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
648 followerTestKit.doCommit(cohort2);
649 followerTestKit.doCommit(cohort3);
653 final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
654 verifyCars(readTx, car);
656 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
657 assertFalse("isPresent", optional.isPresent());
661 public void testChainedTransactionFailureWithSingleShard() throws Exception {
662 initDatastoresWithCars("testChainedTransactionFailureWithSingleShard");
664 final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
665 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
666 LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
667 MoreExecutors.directExecutor());
669 final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
670 final DOMTransactionChain txChain = broker.createTransactionChain(listener);
672 final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
674 final ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
675 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
676 .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
678 writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
681 writeTx.commit().get(5, TimeUnit.SECONDS);
682 fail("Expected TransactionCommitFailedException");
683 } catch (final ExecutionException e) {
687 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
694 public void testChainedTransactionFailureWithMultipleShards() throws Exception {
695 initDatastoresWithCarsAndPeople("testChainedTransactionFailureWithMultipleShards");
697 final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
698 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
699 LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
700 MoreExecutors.directExecutor());
702 final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
703 final DOMTransactionChain txChain = broker.createTransactionChain(listener);
705 final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
707 writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
709 final ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
710 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
711 .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
713 // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
714 // done for put for performance reasons.
715 writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
718 writeTx.commit().get(5, TimeUnit.SECONDS);
719 fail("Expected TransactionCommitFailedException");
720 } catch (final ExecutionException e) {
724 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
731 public void testSingleShardTransactionsWithLeaderChanges() throws Exception {
732 followerDatastoreContextBuilder.backendAlivenessTimerIntervalInSeconds(2);
733 final String testName = "testSingleShardTransactionsWithLeaderChanges";
734 initDatastoresWithCars(testName);
736 final String followerCarShardName = "member-2-shard-cars-" + testName;
737 InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 1, ApplyJournalEntries.class);
739 // Write top-level car container from the follower so it uses a remote Tx.
741 DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
743 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
744 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
746 followerTestKit.doCommit(writeTx.ready());
748 InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
750 // Switch the leader to the follower
752 sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
753 .shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
755 TestKit.shutdownActorSystem(leaderSystem, true);
756 Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
758 followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorUtils(), CARS);
760 leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
761 Cluster.get(leaderSystem).join(MEMBER_2_ADDRESS);
763 final DatastoreContext.Builder newMember1Builder = DatastoreContext.newBuilder()
764 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
765 IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder, commitTimeout);
767 try (AbstractDataStore ds =
768 newMember1TestKit.setupAbstractDataStore(
769 testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS)) {
771 followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), CARS);
773 // Write a car entry to the new leader - should switch to local Tx
775 writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
777 MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
778 YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
779 writeTx.merge(car1Path, car1);
781 followerTestKit.doCommit(writeTx.ready());
783 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1);
787 @SuppressWarnings("unchecked")
789 public void testReadyLocalTransactionForwardedToLeader() throws Exception {
790 initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader");
791 followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars");
793 final Optional<ActorRef> carsFollowerShard =
794 followerDistributedDataStore.getActorUtils().findLocalShard("cars");
795 assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
797 final DataTree dataTree = new InMemoryDataTreeFactory().create(
798 DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full());
800 // Send a tx with immediate commit.
802 DataTreeModification modification = dataTree.takeSnapshot().newModification();
803 new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
804 new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
806 final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
807 new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
808 modification.ready();
810 ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true, Optional.empty());
812 carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
813 Object resp = followerTestKit.expectMsgClass(Object.class);
814 if (resp instanceof akka.actor.Status.Failure) {
815 throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
818 assertEquals("Response type", CommitTransactionReply.class, resp.getClass());
820 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1);
822 // Send another tx without immediate commit.
824 modification = dataTree.takeSnapshot().newModification();
825 MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(30000));
826 new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
827 modification.ready();
829 readyLocal = new ReadyLocalTransaction(tx2 , modification, false, Optional.empty());
831 carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
832 resp = followerTestKit.expectMsgClass(Object.class);
833 if (resp instanceof akka.actor.Status.Failure) {
834 throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
837 assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
839 final ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection(
840 ((ReadyTransactionReply)resp).getCohortPath());
842 final Supplier<Short> versionSupplier = Mockito.mock(Supplier.class);
843 Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
844 ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
845 leaderDistributedDataStore.getActorUtils(), Arrays.asList(
846 new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2);
847 cohort.canCommit().get(5, TimeUnit.SECONDS);
848 cohort.preCommit().get(5, TimeUnit.SECONDS);
849 cohort.commit().get(5, TimeUnit.SECONDS);
851 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
854 @SuppressWarnings("unchecked")
856 public void testForwardedReadyTransactionForwardedToLeader() throws Exception {
857 initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader");
858 followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars");
860 final Optional<ActorRef> carsFollowerShard =
861 followerDistributedDataStore.getActorUtils().findLocalShard("cars");
862 assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
864 carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef());
865 final DataTree dataTree = followerTestKit.expectMsgClass(DataTree.class);
867 // Send a tx with immediate commit.
869 DataTreeModification modification = dataTree.takeSnapshot().newModification();
870 new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
871 new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
873 final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
874 new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
876 ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1,
877 DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
878 Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true,
881 carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
882 Object resp = followerTestKit.expectMsgClass(Object.class);
883 if (resp instanceof akka.actor.Status.Failure) {
884 throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
887 assertEquals("Response type", CommitTransactionReply.class, resp.getClass());
889 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1);
891 // Send another tx without immediate commit.
893 modification = dataTree.takeSnapshot().newModification();
894 MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(30000));
895 new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
897 forwardedReady = new ForwardedReadyTransaction(tx2,
898 DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
899 Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false,
902 carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
903 resp = followerTestKit.expectMsgClass(Object.class);
904 if (resp instanceof akka.actor.Status.Failure) {
905 throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
908 assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
910 ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection(
911 ((ReadyTransactionReply)resp).getCohortPath());
913 final Supplier<Short> versionSupplier = Mockito.mock(Supplier.class);
914 Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
915 final ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
916 leaderDistributedDataStore.getActorUtils(), Arrays.asList(
917 new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2);
918 cohort.canCommit().get(5, TimeUnit.SECONDS);
919 cohort.preCommit().get(5, TimeUnit.SECONDS);
920 cohort.commit().get(5, TimeUnit.SECONDS);
922 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
926 public void testTransactionForwardedToLeaderAfterRetry() throws Exception {
927 // FIXME: remove when test passes also for ClientBackedDataStore
928 Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
929 followerDatastoreContextBuilder.shardBatchedModificationCount(2);
930 leaderDatastoreContextBuilder.shardBatchedModificationCount(2);
931 initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry");
933 // Do an initial write to get the primary shard info cached.
935 final DOMStoreWriteTransaction initialWriteTx = followerDistributedDataStore.newWriteOnlyTransaction();
936 initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
937 initialWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
938 followerTestKit.doCommit(initialWriteTx.ready());
940 // Wait for the commit to be replicated to the follower.
942 MemberNode.verifyRaftState(followerDistributedDataStore, "cars",
943 raftState -> assertEquals("getLastApplied", 1, raftState.getLastApplied()));
945 MemberNode.verifyRaftState(followerDistributedDataStore, "people",
946 raftState -> assertEquals("getLastApplied", 1, raftState.getLastApplied()));
948 // Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in
951 final DOMStoreWriteTransaction writeTx1 = followerDistributedDataStore.newWriteOnlyTransaction();
952 writeTx1.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
953 writeTx1.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
954 final DOMStoreThreePhaseCommitCohort writeTx1Cohort = writeTx1.ready();
955 final ListenableFuture<Boolean> writeTx1CanCommit = writeTx1Cohort.canCommit();
956 writeTx1CanCommit.get(5, TimeUnit.SECONDS);
958 // Prepare and ready another WO tx that writes to 2 shards but don't canCommit yet. This will be queued
959 // in the leader shard.
961 final DOMStoreWriteTransaction writeTx2 = followerDistributedDataStore.newWriteOnlyTransaction();
962 final LinkedList<MapEntryNode> cars = new LinkedList<>();
964 cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
965 writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
967 NormalizedNode<?, ?> people = ImmutableNodes.mapNodeBuilder(PeopleModel.PERSON_QNAME)
968 .withChild(PeopleModel.newPersonEntry("Dude")).build();
969 writeTx2.write(PeopleModel.PERSON_LIST_PATH, people);
970 final DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready();
972 // Prepare another WO that writes to a single shard and thus will be directly committed on ready. This
973 // tx writes 5 cars so 2 BatchedModidifications messages will be sent initially and cached in the
974 // leader shard (with shardBatchedModificationCount set to 2). The 3rd BatchedModidifications will be
977 final DOMStoreWriteTransaction writeTx3 = followerDistributedDataStore.newWriteOnlyTransaction();
978 for (int i = 1; i <= 5; i++, carIndex++) {
979 cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
980 writeTx3.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
983 // Prepare another WO that writes to a single shard. This will send a single BatchedModidifications
986 final DOMStoreWriteTransaction writeTx4 = followerDistributedDataStore.newWriteOnlyTransaction();
987 cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
988 writeTx4.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
991 // Prepare a RW tx that will create a tx actor and send a ForwardedReadyTransaciton message to the
992 // leader shard on ready.
994 final DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction();
995 cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
996 readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
998 IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
999 stats -> assertEquals("getReadWriteTransactionCount", 5, stats.getReadWriteTransactionCount()));
1001 // Disable elections on the leader so it switches to follower.
1003 sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder
1004 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
1005 .shardElectionTimeoutFactor(10));
1007 leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorUtils(), "cars");
1009 // Submit all tx's - the messages should get queued for retry.
1011 final ListenableFuture<Boolean> writeTx2CanCommit = writeTx2Cohort.canCommit();
1012 final DOMStoreThreePhaseCommitCohort writeTx3Cohort = writeTx3.ready();
1013 final DOMStoreThreePhaseCommitCohort writeTx4Cohort = writeTx4.ready();
1014 final DOMStoreThreePhaseCommitCohort rwTxCohort = readWriteTx.ready();
1016 // Enable elections on the other follower so it becomes the leader, at which point the
1017 // tx's should get forwarded from the previous leader to the new leader to complete the commits.
1019 sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
1020 .customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1));
1021 IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorUtils(), "cars")
1022 .tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1023 IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorUtils(), "people")
1024 .tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1026 followerTestKit.doCommit(writeTx1CanCommit, writeTx1Cohort);
1027 followerTestKit.doCommit(writeTx2CanCommit, writeTx2Cohort);
1028 followerTestKit.doCommit(writeTx3Cohort);
1029 followerTestKit.doCommit(writeTx4Cohort);
1030 followerTestKit.doCommit(rwTxCohort);
1032 DOMStoreReadTransaction readTx = leaderDistributedDataStore.newReadOnlyTransaction();
1033 verifyCars(readTx, cars.toArray(new MapEntryNode[cars.size()]));
1034 verifyNode(readTx, PeopleModel.PERSON_LIST_PATH, people);
1038 public void testLeadershipTransferOnShutdown() throws Exception {
1039 // FIXME: remove when test passes also for ClientBackedDataStore
1040 Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
1041 leaderDatastoreContextBuilder.shardBatchedModificationCount(1);
1042 followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null);
1043 final String testName = "testLeadershipTransferOnShutdown";
1044 initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE);
1046 final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System,
1047 DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500),
1049 try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore(
1050 testParameter, testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) {
1052 followerTestKit.waitForMembersUp("member-3");
1053 follower2TestKit.waitForMembersUp("member-1", "member-2");
1055 // Create and submit a couple tx's so they're pending.
1057 DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
1058 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1059 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
1060 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
1061 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
1063 IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
1064 stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize()));
1066 writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
1067 final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
1068 writeTx.write(CarsModel.newCarPath("optima"), car);
1069 final DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready();
1071 IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
1072 stats -> assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize()));
1074 // Gracefully stop the leader via a Shutdown message.
1076 sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder
1077 .shardElectionTimeoutFactor(100));
1079 final FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
1080 final Future<ActorRef> future = leaderDistributedDataStore.getActorUtils().findLocalShardAsync("cars");
1081 final ActorRef leaderActor = Await.result(future, duration);
1083 final Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE);
1085 // Commit the 2 transactions. They should finish and succeed.
1087 followerTestKit.doCommit(cohort1);
1088 followerTestKit.doCommit(cohort2);
1090 // Wait for the leader actor stopped.
1092 final Boolean stopped = Await.result(stopFuture, duration);
1093 assertEquals("Stopped", Boolean.TRUE, stopped);
1095 // Verify leadership was transferred by reading the committed data from the other nodes.
1097 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car);
1098 verifyCars(follower2DistributedDataStore.newReadOnlyTransaction(), car);
1103 public void testTransactionWithIsolatedLeader() throws Exception {
1104 // FIXME: remove when test passes also for ClientBackedDataStore
1105 Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
1106 // Set the isolated leader check interval high so we can control the switch to IsolatedLeader.
1107 leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000);
1108 final String testName = "testTransactionWithIsolatedLeader";
1109 initDatastoresWithCars(testName);
1111 // Tx that is submitted after the follower is stopped but before the leader transitions to IsolatedLeader.
1112 final DOMStoreWriteTransaction preIsolatedLeaderWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
1113 preIsolatedLeaderWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1115 // Tx that is submitted after the leader transitions to IsolatedLeader.
1116 final DOMStoreWriteTransaction noShardLeaderWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
1117 noShardLeaderWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1119 // Tx that is submitted after the follower is reinstated.
1120 final DOMStoreWriteTransaction successWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
1121 successWriteTx.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1123 // Stop the follower
1124 followerTestKit.watch(followerDistributedDataStore.getActorUtils().getShardManager());
1125 followerDistributedDataStore.close();
1126 followerTestKit.expectTerminated(followerDistributedDataStore.getActorUtils().getShardManager());
1128 // Submit the preIsolatedLeaderWriteTx so it's pending
1129 final DOMStoreThreePhaseCommitCohort preIsolatedLeaderTxCohort = preIsolatedLeaderWriteTx.ready();
1131 // Change the isolated leader check interval low so it changes to IsolatedLeader.
1132 sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder
1133 .shardIsolatedLeaderCheckIntervalInMillis(200));
1135 MemberNode.verifyRaftState(leaderDistributedDataStore, "cars",
1136 raftState -> assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState()));
1139 leaderTestKit.doCommit(noShardLeaderWriteTx.ready());
1140 fail("Expected NoShardLeaderException");
1141 } catch (final ExecutionException e) {
1142 assertEquals("getCause", NoShardLeaderException.class, Throwables.getRootCause(e).getClass());
1145 sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder
1146 .shardElectionTimeoutFactor(100));
1148 final DOMStoreThreePhaseCommitCohort successTxCohort = successWriteTx.ready();
1150 followerDistributedDataStore = followerTestKit.setupAbstractDataStore(
1151 testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS);
1153 leaderTestKit.doCommit(preIsolatedLeaderTxCohort);
1154 leaderTestKit.doCommit(successTxCohort);
1158 public void testTransactionWithShardLeaderNotResponding() throws Exception {
1159 followerDatastoreContextBuilder.frontendRequestTimeoutInSeconds(2);
1160 followerDatastoreContextBuilder.shardElectionTimeoutFactor(50);
1161 initDatastoresWithCars("testTransactionWithShardLeaderNotResponding");
1163 // Do an initial read to get the primary shard info cached.
1165 final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
1166 readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1168 // Shutdown the leader and try to create a new tx.
1170 TestKit.shutdownActorSystem(leaderSystem, true);
1172 followerDatastoreContextBuilder.operationTimeoutInMillis(50).shardElectionTimeoutFactor(1);
1173 sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
1175 final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
1177 rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1180 followerTestKit.doCommit(rwTx.ready());
1181 fail("Exception expected");
1182 } catch (final ExecutionException e) {
1183 final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
1184 if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
1185 assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException
1186 || e.getCause() instanceof ShardLeaderNotRespondingException);
1188 assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
1194 public void testTransactionWithCreateTxFailureDueToNoLeader() throws Exception {
1195 followerDatastoreContextBuilder.frontendRequestTimeoutInSeconds(2);
1196 initDatastoresWithCars("testTransactionWithCreateTxFailureDueToNoLeader");
1198 // Do an initial read to get the primary shard info cached.
1200 final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
1201 readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1203 // Shutdown the leader and try to create a new tx.
1205 TestKit.shutdownActorSystem(leaderSystem, true);
1207 Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
1209 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
1211 sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
1212 .operationTimeoutInMillis(10).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
1214 final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
1216 rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1219 followerTestKit.doCommit(rwTx.ready());
1220 fail("Exception expected");
1221 } catch (final ExecutionException e) {
1222 final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
1223 if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
1224 assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException);
1226 assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
1232 public void testTransactionRetryWithInitialAskTimeoutExOnCreateTx() throws Exception {
1233 followerDatastoreContextBuilder.backendAlivenessTimerIntervalInSeconds(2);
1234 String testName = "testTransactionRetryWithInitialAskTimeoutExOnCreateTx";
1235 initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS);
1237 final DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder()
1238 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10);
1239 final IntegrationTestKit follower2TestKit = new IntegrationTestKit(
1240 follower2System, follower2DatastoreContextBuilder, commitTimeout);
1242 try (AbstractDataStore ds =
1243 follower2TestKit.setupAbstractDataStore(
1244 testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS)) {
1246 followerTestKit.waitForMembersUp("member-1", "member-3");
1247 follower2TestKit.waitForMembersUp("member-1", "member-2");
1249 // Do an initial read to get the primary shard info cached.
1251 final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
1252 readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1254 // Shutdown the leader and try to create a new tx.
1256 TestKit.shutdownActorSystem(leaderSystem, true);
1258 Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
1260 sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
1261 .operationTimeoutInMillis(500).shardElectionTimeoutFactor(5).customRaftPolicyImplementation(null));
1263 final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
1265 rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1267 followerTestKit.doCommit(rwTx.ready());
1272 public void testSemiReachableCandidateNotDroppingLeader() throws Exception {
1273 final String testName = "testSemiReachableCandidateNotDroppingLeader";
1274 initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS);
1276 final DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder()
1277 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10);
1278 final IntegrationTestKit follower2TestKit = new IntegrationTestKit(
1279 follower2System, follower2DatastoreContextBuilder, commitTimeout);
1281 final AbstractDataStore ds2 =
1282 follower2TestKit.setupAbstractDataStore(
1283 testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS);
1285 followerTestKit.waitForMembersUp("member-1", "member-3");
1286 follower2TestKit.waitForMembersUp("member-1", "member-2");
1288 TestKit.shutdownActorSystem(follower2System);
1290 ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").get();
1291 OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
1292 .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
1294 Cluster leaderCluster = Cluster.get(leaderSystem);
1295 Cluster followerCluster = Cluster.get(followerSystem);
1296 Cluster follower2Cluster = Cluster.get(follower2System);
1298 Member follower2Member = follower2Cluster.readView().self();
1300 await().atMost(10, TimeUnit.SECONDS)
1301 .until(() -> leaderCluster.readView().unreachableMembers().contains(follower2Member));
1302 await().atMost(10, TimeUnit.SECONDS)
1303 .until(() -> followerCluster.readView().unreachableMembers().contains(follower2Member));
1305 ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").get();
1307 // to simulate a follower not being able to receive messages, but still being able to send messages and becoming
1308 // candidate, we can just send a couple of RequestVotes to both leader and follower.
1309 cars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null);
1310 followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null);
1311 cars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null);
1312 followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null);
1314 OnDemandRaftState stateAfter = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
1315 .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
1316 OnDemandRaftState followerState = (OnDemandRaftState) followerDistributedDataStore.getActorUtils()
1317 .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
1319 assertEquals(initialState.getCurrentTerm(), stateAfter.getCurrentTerm());
1320 assertEquals(initialState.getCurrentTerm(), followerState.getCurrentTerm());
1326 public void testInstallSnapshot() throws Exception {
1327 final String testName = "testInstallSnapshot";
1328 final String leaderCarShardName = "member-1-shard-cars-" + testName;
1329 final String followerCarShardName = "member-2-shard-cars-" + testName;
1331 // Setup a saved snapshot on the leader. The follower will startup with no data and the leader should
1332 // install a snapshot to sync the follower.
1334 DataTree tree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_CONFIGURATION,
1335 SchemaContextHelper.full());
1337 final ContainerNode carsNode = CarsModel.newCarsNode(
1338 CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", Uint64.valueOf(20000))));
1339 AbstractShardTest.writeToStore(tree, CarsModel.BASE_PATH, carsNode);
1341 final NormalizedNode<?, ?> snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.empty());
1342 final Snapshot initialSnapshot = Snapshot.create(
1343 new ShardSnapshotState(new MetadataShardDataTreeSnapshot(snapshotRoot)),
1344 Collections.emptyList(), 5, 1, 5, 1, 1, null, null);
1345 InMemorySnapshotStore.addSnapshot(leaderCarShardName, initialSnapshot);
1347 InMemorySnapshotStore.addSnapshotSavedLatch(leaderCarShardName);
1348 InMemorySnapshotStore.addSnapshotSavedLatch(followerCarShardName);
1350 initDatastoresWithCars(testName);
1352 final Optional<NormalizedNode<?, ?>> readOptional = leaderDistributedDataStore.newReadOnlyTransaction().read(
1353 CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1354 assertTrue("isPresent", readOptional.isPresent());
1355 assertEquals("Node", carsNode, readOptional.get());
1357 verifySnapshot(InMemorySnapshotStore.waitForSavedSnapshot(leaderCarShardName, Snapshot.class),
1358 initialSnapshot, snapshotRoot);
1360 verifySnapshot(InMemorySnapshotStore.waitForSavedSnapshot(followerCarShardName, Snapshot.class),
1361 initialSnapshot, snapshotRoot);
1365 public void testReadWriteMessageSlicing() throws Exception {
1366 // The slicing is only implemented for tell-based protocol
1367 Assume.assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter));
1369 leaderDatastoreContextBuilder.maximumMessageSliceSize(100);
1370 followerDatastoreContextBuilder.maximumMessageSliceSize(100);
1371 initDatastoresWithCars("testLargeReadReplySlicing");
1373 final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
1375 final NormalizedNode<?, ?> carsNode = CarsModel.create();
1376 rwTx.write(CarsModel.BASE_PATH, carsNode);
1378 verifyNode(rwTx, CarsModel.BASE_PATH, carsNode);
1381 @SuppressWarnings("IllegalCatch")
1383 public void testRaftCallbackDuringLeadershipDrop() throws Exception {
1384 final String testName = "testRaftCallbackDuringLeadershipDrop";
1385 initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS);
1387 final ExecutorService executor = Executors.newSingleThreadExecutor();
1389 final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System,
1390 DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500)
1391 .shardLeaderElectionTimeoutInSeconds(3600),
1394 final DOMStoreWriteTransaction initialWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
1395 initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1396 leaderTestKit.doCommit(initialWriteTx.ready());
1398 try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore(
1399 testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false)) {
1401 final ActorRef member3Cars = ((LocalShardStore) follower2DistributedDataStore).getLocalShards()
1402 .getLocalShards().get("cars").getActor();
1403 final ActorRef member2Cars = ((LocalShardStore)followerDistributedDataStore).getLocalShards()
1404 .getLocalShards().get("cars").getActor();
1405 member2Cars.tell(new StartDropMessages(AppendEntries.class), null);
1406 member3Cars.tell(new StartDropMessages(AppendEntries.class), null);
1408 final DOMStoreWriteTransaction newTx = leaderDistributedDataStore.newWriteOnlyTransaction();
1409 newTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
1410 final AtomicBoolean submitDone = new AtomicBoolean(false);
1411 executor.submit(() -> {
1413 leaderTestKit.doCommit(newTx.ready());
1414 submitDone.set(true);
1415 } catch (Exception e) {
1416 throw new RuntimeException(e);
1419 final ActorRef leaderCars = ((LocalShardStore) leaderDistributedDataStore).getLocalShards()
1420 .getLocalShards().get("cars").getActor();
1421 await().atMost(10, TimeUnit.SECONDS)
1422 .until(() -> ((OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
1423 .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE)).getLastIndex() >= 1);
1425 final OnDemandRaftState raftState = (OnDemandRaftState)leaderDistributedDataStore.getActorUtils()
1426 .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE);
1428 // Simulate a follower not receiving heartbeats but still being able to send messages ie RequestVote with
1429 // new term(switching to candidate after election timeout)
1430 leaderCars.tell(new RequestVote(raftState.getCurrentTerm() + 1,
1431 "member-3-shard-cars-testRaftCallbackDuringLeadershipDrop", -1,
1434 member2Cars.tell(new StopDropMessages(AppendEntries.class), null);
1435 member3Cars.tell(new StopDropMessages(AppendEntries.class), null);
1437 await("Is tx stuck in COMMIT_PENDING")
1438 .atMost(10, TimeUnit.SECONDS).untilAtomic(submitDone, equalTo(true));
1442 executor.shutdownNow();
1446 @Ignore("Writes to root node are not split into shards")
1447 public void testSnapshotOnRootOverwrite() throws Exception {
1448 if (!DistributedDataStore.class.isAssignableFrom(testParameter)) {
1449 // FIXME: ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate
1453 final String testName = "testSnapshotOnRootOverwrite";
1454 String[] shards = {"cars", "default"};
1455 initDatastores(testName, "module-shards-default-cars-member1.conf", shards,
1456 leaderDatastoreContextBuilder.snapshotOnRootOverwrite(true),
1457 followerDatastoreContextBuilder.snapshotOnRootOverwrite(true));
1459 leaderTestKit.waitForMembersUp("member-2");
1460 ContainerNode rootNode = ImmutableContainerNodeBuilder.create()
1461 .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(SchemaContext.NAME))
1462 .withChild((ContainerNode) CarsModel.create())
1465 leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode);
1467 IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars",
1468 state -> assertEquals(0, state.getSnapshotIndex()));
1470 IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars",
1471 state -> assertEquals(0, state.getSnapshotIndex()));
1473 verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 0);
1474 verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 0);
1476 for (int i = 0; i < 10; i++) {
1477 leaderTestKit.testWriteTransaction(leaderDistributedDataStore, CarsModel.newCarPath("car " + i),
1478 CarsModel.newCarEntry("car " + i, Uint64.ONE));
1481 // fake snapshot causes the snapshotIndex to move
1482 IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars",
1483 state -> assertEquals(9, state.getSnapshotIndex()));
1484 IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars",
1485 state -> assertEquals(9, state.getSnapshotIndex()));
1487 // however the real snapshot still has not changed and was taken at index 0
1488 verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 0);
1489 verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 0);
1491 // root overwrite so expect a snapshot
1492 leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode);
1494 // this was a real snapshot so everything should be in it(1 + 10 + 1)
1495 IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars",
1496 state -> assertEquals(11, state.getSnapshotIndex()));
1497 IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars",
1498 state -> assertEquals(11, state.getSnapshotIndex()));
1500 verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 11);
1501 verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 11);
1504 private void verifySnapshot(String persistenceId, long lastAppliedIndex) {
1505 await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
1506 List<Snapshot> snap = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class);
1507 assertEquals(1, snap.size());
1508 assertEquals(lastAppliedIndex, snap.get(0).getLastAppliedIndex());
1513 private static void verifySnapshot(final Snapshot actual, final Snapshot expected,
1514 final NormalizedNode<?, ?> expRoot) {
1515 assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());
1516 assertEquals("Snapshot getLastAppliedIndex", expected.getLastAppliedIndex(), actual.getLastAppliedIndex());
1517 assertEquals("Snapshot getLastTerm", expected.getLastTerm(), actual.getLastTerm());
1518 assertEquals("Snapshot getLastIndex", expected.getLastIndex(), actual.getLastIndex());
1519 assertEquals("Snapshot state type", ShardSnapshotState.class, actual.getState().getClass());
1520 MetadataShardDataTreeSnapshot shardSnapshot =
1521 (MetadataShardDataTreeSnapshot) ((ShardSnapshotState)actual.getState()).getSnapshot();
1522 assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().get());
1525 private static void sendDatastoreContextUpdate(final AbstractDataStore dataStore, final Builder builder) {
1526 final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build());
1527 final DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
1528 final Answer<DatastoreContext> answer = invocation -> newBuilder.build();
1529 Mockito.doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext();
1530 Mockito.doAnswer(answer).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
1531 dataStore.onDatastoreContextUpdated(mockContextFactory);