2 * Copyright (c) 2015, 2017 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.awaitility.Awaitility.await;
11 import static org.hamcrest.CoreMatchers.containsString;
12 import static org.hamcrest.CoreMatchers.instanceOf;
13 import static org.hamcrest.MatcherAssert.assertThat;
14 import static org.hamcrest.Matchers.equalTo;
15 import static org.junit.Assert.assertEquals;
16 import static org.junit.Assert.assertFalse;
17 import static org.junit.Assert.assertNotNull;
18 import static org.junit.Assert.assertThrows;
19 import static org.junit.Assert.assertTrue;
20 import static org.junit.Assume.assumeTrue;
21 import static org.mockito.ArgumentMatchers.any;
22 import static org.mockito.ArgumentMatchers.anyString;
23 import static org.mockito.ArgumentMatchers.eq;
24 import static org.mockito.Mockito.doAnswer;
25 import static org.mockito.Mockito.mock;
26 import static org.mockito.Mockito.timeout;
27 import static org.mockito.Mockito.verify;
29 import akka.actor.ActorRef;
30 import akka.actor.ActorSelection;
31 import akka.actor.ActorSystem;
32 import akka.actor.Address;
33 import akka.actor.AddressFromURIString;
34 import akka.cluster.Cluster;
35 import akka.cluster.Member;
36 import akka.dispatch.Futures;
37 import akka.pattern.Patterns;
38 import akka.testkit.javadsl.TestKit;
39 import com.google.common.base.Stopwatch;
40 import com.google.common.base.Throwables;
41 import com.google.common.collect.ImmutableMap;
42 import com.google.common.util.concurrent.ListenableFuture;
43 import com.google.common.util.concurrent.MoreExecutors;
44 import com.google.common.util.concurrent.Uninterruptibles;
45 import com.typesafe.config.ConfigFactory;
46 import java.util.Arrays;
47 import java.util.Collection;
48 import java.util.Collections;
49 import java.util.LinkedList;
50 import java.util.List;
51 import java.util.Optional;
52 import java.util.concurrent.ExecutionException;
53 import java.util.concurrent.ExecutorService;
54 import java.util.concurrent.Executors;
55 import java.util.concurrent.TimeUnit;
56 import java.util.concurrent.atomic.AtomicBoolean;
57 import java.util.concurrent.atomic.AtomicLong;
58 import org.junit.After;
59 import org.junit.Before;
60 import org.junit.Test;
61 import org.junit.runner.RunWith;
62 import org.junit.runners.Parameterized;
63 import org.junit.runners.Parameterized.Parameter;
64 import org.junit.runners.Parameterized.Parameters;
65 import org.mockito.stubbing.Answer;
66 import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
67 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
68 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
69 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
70 import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
71 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
72 import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
73 import org.opendaylight.controller.cluster.datastore.TestShard.StartDropMessages;
74 import org.opendaylight.controller.cluster.datastore.TestShard.StopDropMessages;
75 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
76 import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
77 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
78 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
79 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
80 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
81 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
82 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
83 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
84 import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMetadata;
85 import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
86 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
87 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
88 import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongBitmap;
89 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
90 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
91 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
92 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
93 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
94 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
95 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
96 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
97 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
98 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
99 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
100 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
101 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
102 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
103 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
104 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
105 import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
106 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
107 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
108 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
109 import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
110 import org.opendaylight.mdsal.dom.spi.store.DOMStore;
111 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
112 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
113 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
114 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
115 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
116 import org.opendaylight.yangtools.yang.common.Uint64;
117 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
118 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
119 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
120 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
121 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
122 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
123 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
124 import org.opendaylight.yangtools.yang.data.tree.api.ConflictingModificationAppliedException;
125 import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
126 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration;
127 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
128 import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory;
129 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
130 import scala.collection.Set;
131 import scala.concurrent.Await;
132 import scala.concurrent.Future;
133 import scala.concurrent.duration.FiniteDuration;
136 * End-to-end distributed data store tests that exercise remote shards and transactions.
138 * @author Thomas Pantelis
140 @RunWith(Parameterized.class)
141 public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
143 @Parameters(name = "{0}")
144 public static Collection<Object[]> data() {
145 return Arrays.asList(new Object[][] {
146 { TestDistributedDataStore.class, 7 }, { TestClientBackedDataStore.class, 12 }
151 public Class<? extends AbstractDataStore> testParameter;
153 public int commitTimeout;
155 private static final String[] CARS_AND_PEOPLE = {"cars", "people"};
156 private static final String[] CARS = {"cars"};
158 private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse(
159 "akka://cluster-test@127.0.0.1:2558");
160 private static final Address MEMBER_2_ADDRESS = AddressFromURIString.parse(
161 "akka://cluster-test@127.0.0.1:2559");
163 private static final String MODULE_SHARDS_CARS_ONLY_1_2 = "module-shards-cars-member-1-and-2.conf";
164 private static final String MODULE_SHARDS_CARS_PEOPLE_1_2 = "module-shards-member1-and-2.conf";
165 private static final String MODULE_SHARDS_CARS_PEOPLE_1_2_3 = "module-shards-member1-and-2-and-3.conf";
166 private static final String MODULE_SHARDS_CARS_1_2_3 = "module-shards-cars-member-1-and-2-and-3.conf";
168 private ActorSystem leaderSystem;
169 private ActorSystem followerSystem;
170 private ActorSystem follower2System;
172 private final DatastoreContext.Builder leaderDatastoreContextBuilder =
173 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
175 private final DatastoreContext.Builder followerDatastoreContextBuilder =
176 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
177 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
178 private final TransactionIdentifier tx1 = nextTransactionId();
179 private final TransactionIdentifier tx2 = nextTransactionId();
181 private AbstractDataStore followerDistributedDataStore;
182 private AbstractDataStore leaderDistributedDataStore;
183 private IntegrationTestKit followerTestKit;
184 private IntegrationTestKit leaderTestKit;
187 public void setUp() {
188 InMemoryJournal.clear();
189 InMemorySnapshotStore.clear();
191 leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
192 Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
194 followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
195 Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
197 follower2System = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member3"));
198 Cluster.get(follower2System).join(MEMBER_1_ADDRESS);
202 public void tearDown() {
203 if (followerDistributedDataStore != null) {
204 leaderDistributedDataStore.close();
206 if (leaderDistributedDataStore != null) {
207 leaderDistributedDataStore.close();
210 TestKit.shutdownActorSystem(leaderSystem, true);
211 TestKit.shutdownActorSystem(followerSystem, true);
212 TestKit.shutdownActorSystem(follower2System,true);
214 InMemoryJournal.clear();
215 InMemorySnapshotStore.clear();
218 private void initDatastoresWithCars(final String type) throws Exception {
219 initDatastores(type, MODULE_SHARDS_CARS_ONLY_1_2, CARS);
222 private void initDatastoresWithCarsAndPeople(final String type) throws Exception {
223 initDatastores(type, MODULE_SHARDS_CARS_PEOPLE_1_2, CARS_AND_PEOPLE);
226 private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards)
228 initDatastores(type, moduleShardsConfig, shards, leaderDatastoreContextBuilder,
229 followerDatastoreContextBuilder);
232 private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards,
233 final DatastoreContext.Builder leaderBuilder, final DatastoreContext.Builder followerBuilder)
235 leaderTestKit = new IntegrationTestKit(leaderSystem, leaderBuilder, commitTimeout);
237 leaderDistributedDataStore = leaderTestKit.setupAbstractDataStore(
238 testParameter, type, moduleShardsConfig, false, shards);
240 followerTestKit = new IntegrationTestKit(followerSystem, followerBuilder, commitTimeout);
241 followerDistributedDataStore = followerTestKit.setupAbstractDataStore(
242 testParameter, type, moduleShardsConfig, false, shards);
244 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorUtils(), shards);
246 leaderTestKit.waitForMembersUp("member-2");
247 followerTestKit.waitForMembersUp("member-1");
250 private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries)
252 assertEquals("Car list node",
253 Optional.of(ImmutableNodes.mapNodeBuilder(CarsModel.CAR_QNAME).withValue(Arrays.asList(entries)).build()),
254 readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS));
257 private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path,
258 final NormalizedNode expNode) throws Exception {
259 assertEquals(Optional.of(expNode), readTx.read(path).get(5, TimeUnit.SECONDS));
262 private static void verifyExists(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path)
264 assertEquals("exists", Boolean.TRUE, readTx.exists(path).get(5, TimeUnit.SECONDS));
268 public void testWriteTransactionWithSingleShard() throws Exception {
269 final String testName = "testWriteTransactionWithSingleShard";
270 initDatastoresWithCars(testName);
272 final String followerCarShardName = "member-2-shard-cars-" + testName;
274 DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
275 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
277 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
278 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
280 final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
281 final YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
282 writeTx.merge(car1Path, car1);
284 final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000));
285 final YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
286 writeTx.merge(car2Path, car2);
288 followerTestKit.doCommit(writeTx.ready());
290 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1, car2);
292 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
296 writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
298 writeTx.delete(car1Path);
300 followerTestKit.doCommit(writeTx.ready());
302 verifyExists(followerDistributedDataStore.newReadOnlyTransaction(), car2Path);
304 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
306 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car2);
308 // Re-instate the follower member 2 as a single-node to verify replication and recovery.
310 // The following is a bit tricky. Before we reinstate the follower we need to ensure it has persisted and
311 // applied and all the log entries from the leader. Since we've verified the car data above we know that
312 // all the transactions have been applied on the leader so we first read and capture its lastAppliedIndex.
313 final AtomicLong leaderLastAppliedIndex = new AtomicLong();
314 IntegrationTestKit.verifyShardState(leaderDistributedDataStore, CARS[0],
315 state -> leaderLastAppliedIndex.set(state.getLastApplied()));
317 // Now we need to make sure the follower has persisted the leader's lastAppliedIndex via ApplyJournalEntries.
318 // However we don't know exactly how many ApplyJournalEntries messages there will be as it can differ between
319 // the tell-based and ask-based front-ends. For ask-based there will be exactly 2 ApplyJournalEntries but
320 // tell-based persists additional payloads which could be replicated and applied in a batch resulting in
321 // either 2 or 3 ApplyJournalEntries. To handle this we read the follower's persisted ApplyJournalEntries
322 // until we find the one that encompasses the leader's lastAppliedIndex.
323 Stopwatch sw = Stopwatch.createStarted();
324 boolean done = false;
326 final List<ApplyJournalEntries> entries = InMemoryJournal.get(followerCarShardName,
327 ApplyJournalEntries.class);
328 for (ApplyJournalEntries aje: entries) {
329 if (aje.getToIndex() >= leaderLastAppliedIndex.get()) {
335 assertTrue("Follower did not persist ApplyJournalEntries containing leader's lastAppliedIndex "
336 + leaderLastAppliedIndex + ". Entries persisted: " + entries, sw.elapsed(TimeUnit.SECONDS) <= 5);
338 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
341 TestKit.shutdownActorSystem(leaderSystem, true);
342 TestKit.shutdownActorSystem(followerSystem, true);
344 final ActorSystem newSystem = newActorSystem("reinstated-member2", "Member2");
346 try (AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder,
348 .setupAbstractDataStore(testParameter, testName, "module-shards-member2", true, CARS)) {
349 verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
354 public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
355 initDatastoresWithCars("testSingleTransactionsWritesInQuickSuccession");
357 final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
359 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
360 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
361 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
362 followerTestKit.doCommit(writeTx.ready());
365 for (int i = 0; i < numCars; i++) {
366 writeTx = txChain.newWriteOnlyTransaction();
367 writeTx.write(CarsModel.newCarPath("car" + i), CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000)));
368 followerTestKit.doCommit(writeTx.ready());
370 try (var tx = txChain.newReadOnlyTransaction()) {
371 tx.read(CarsModel.BASE_PATH).get();
375 // wait to let the shard catch up with purged
376 await("Range set leak test").atMost(5, TimeUnit.SECONDS)
377 .pollInterval(500, TimeUnit.MILLISECONDS)
378 .untilAsserted(() -> {
379 final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars")
381 final var frontendMetadata =
382 (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
383 .executeOperation(localShard, new RequestFrontendMetadata());
385 final var clientMeta = frontendMetadata.getClients().get(0);
386 if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
387 assertTellClientMetadata(clientMeta, numCars * 2);
389 assertAskClientMetadata(clientMeta);
393 try (var tx = txChain.newReadOnlyTransaction()) {
394 final var body = tx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS).orElseThrow().body();
395 assertThat(body, instanceOf(Collection.class));
396 assertEquals(numCars, ((Collection<?>) body).size());
400 private static void assertAskClientMetadata(final FrontendClientMetadata clientMeta) {
401 // ask based should track no metadata
402 assertEquals(List.of(), clientMeta.getCurrentHistories());
405 private static void assertTellClientMetadata(final FrontendClientMetadata clientMeta, final long lastPurged) {
406 final var iterator = clientMeta.getCurrentHistories().iterator();
407 var metadata = iterator.next();
408 while (iterator.hasNext() && metadata.getHistoryId() != 1) {
409 metadata = iterator.next();
412 assertEquals(UnsignedLongBitmap.of(), metadata.getClosedTransactions());
413 assertEquals("[[0.." + lastPurged + "]]", metadata.getPurgedTransactions().ranges().toString());
417 public void testCloseTransactionMetadataLeak() throws Exception {
418 // FIXME: CONTROLLER-2016: ask-based frontend triggers this:
420 // java.lang.IllegalStateException: Previous transaction
421 // member-2-datastore-testCloseTransactionMetadataLeak-fe-0-chn-1-txn-1-0 is not ready yet
422 // at org.opendaylight.controller.cluster.datastore.TransactionChainProxy$Allocated.checkReady()
423 // at org.opendaylight.controller.cluster.datastore.TransactionChainProxy.newReadOnlyTransaction()
424 assumeTrue(testParameter.isAssignableFrom(ClientBackedDataStore.class));
426 initDatastoresWithCars("testCloseTransactionMetadataLeak");
428 final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
430 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
431 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
432 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
433 followerTestKit.doCommit(writeTx.ready());
436 for (int i = 0; i < numCars; i++) {
437 try (var tx = txChain.newWriteOnlyTransaction()) {
441 try (var tx = txChain.newReadOnlyTransaction()) {
442 tx.read(CarsModel.BASE_PATH).get();
446 // wait to let the shard catch up with purged
447 await("wait for purges to settle").atMost(5, TimeUnit.SECONDS)
448 .pollInterval(500, TimeUnit.MILLISECONDS)
449 .untilAsserted(() -> {
450 final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars")
452 final var frontendMetadata =
453 (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
454 .executeOperation(localShard, new RequestFrontendMetadata());
456 final var clientMeta = frontendMetadata.getClients().get(0);
457 if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
458 assertTellClientMetadata(clientMeta, numCars * 2);
460 assertAskClientMetadata(clientMeta);
466 public void testReadWriteTransactionWithSingleShard() throws Exception {
467 initDatastoresWithCars("testReadWriteTransactionWithSingleShard");
469 final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
470 assertNotNull("newReadWriteTransaction returned null", rwTx);
472 rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
473 rwTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
475 final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
476 rwTx.merge(CarsModel.newCarPath("optima"), car1);
478 verifyCars(rwTx, car1);
480 final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000));
481 final YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
482 rwTx.merge(car2Path, car2);
484 verifyExists(rwTx, car2Path);
486 followerTestKit.doCommit(rwTx.ready());
488 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1, car2);
492 public void testWriteTransactionWithMultipleShards() throws Exception {
493 initDatastoresWithCarsAndPeople("testWriteTransactionWithMultipleShards");
495 final DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
496 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
498 final YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
499 final NormalizedNode carsNode = CarsModel.emptyContainer();
500 writeTx.write(carsPath, carsNode);
502 final YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
503 final NormalizedNode peopleNode = PeopleModel.emptyContainer();
504 writeTx.write(peoplePath, peopleNode);
506 followerTestKit.doCommit(writeTx.ready());
508 final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
510 verifyNode(readTx, carsPath, carsNode);
511 verifyNode(readTx, peoplePath, peopleNode);
515 public void testReadWriteTransactionWithMultipleShards() throws Exception {
516 initDatastoresWithCarsAndPeople("testReadWriteTransactionWithMultipleShards");
518 final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
519 assertNotNull("newReadWriteTransaction returned null", rwTx);
521 final YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
522 final NormalizedNode carsNode = CarsModel.emptyContainer();
523 rwTx.write(carsPath, carsNode);
525 final YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
526 final NormalizedNode peopleNode = PeopleModel.emptyContainer();
527 rwTx.write(peoplePath, peopleNode);
529 followerTestKit.doCommit(rwTx.ready());
531 final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
533 verifyNode(readTx, carsPath, carsNode);
534 verifyNode(readTx, peoplePath, peopleNode);
538 public void testTransactionChainWithSingleShard() throws Exception {
539 initDatastoresWithCars("testTransactionChainWithSingleShard");
541 final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
543 // Add the top-level cars container with write-only.
545 final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
546 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
548 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
550 final DOMStoreThreePhaseCommitCohort writeTxReady = writeTx.ready();
552 // Verify the top-level cars container with read-only.
554 verifyNode(txChain.newReadOnlyTransaction(), CarsModel.BASE_PATH, CarsModel.emptyContainer());
556 // Perform car operations with read-write.
558 final DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
560 verifyNode(rwTx, CarsModel.BASE_PATH, CarsModel.emptyContainer());
562 rwTx.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
564 final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
565 final YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
566 rwTx.write(car1Path, car1);
568 verifyExists(rwTx, car1Path);
570 verifyCars(rwTx, car1);
572 final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000));
573 rwTx.merge(CarsModel.newCarPath("sportage"), car2);
575 rwTx.delete(car1Path);
577 followerTestKit.doCommit(writeTxReady);
579 followerTestKit.doCommit(rwTx.ready());
583 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
587 public void testTransactionChainWithMultipleShards() throws Exception {
588 initDatastoresWithCarsAndPeople("testTransactionChainWithMultipleShards");
590 final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
592 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
593 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
595 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
596 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
598 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
599 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
601 followerTestKit.doCommit(writeTx.ready());
603 final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
605 final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
606 final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
607 readWriteTx.write(carPath, car);
609 final MapEntryNode person = PeopleModel.newPersonEntry("jack");
610 final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
611 readWriteTx.merge(personPath, person);
613 assertEquals(Optional.of(car), readWriteTx.read(carPath).get(5, TimeUnit.SECONDS));
614 assertEquals(Optional.of(person), readWriteTx.read(personPath).get(5, TimeUnit.SECONDS));
616 final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
618 writeTx = txChain.newWriteOnlyTransaction();
620 writeTx.delete(personPath);
622 final DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
624 followerTestKit.doCommit(cohort2);
625 followerTestKit.doCommit(cohort3);
629 final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
630 verifyCars(readTx, car);
632 assertEquals(Optional.empty(), readTx.read(personPath).get(5, TimeUnit.SECONDS));
636 public void testChainedTransactionFailureWithSingleShard() throws Exception {
637 initDatastoresWithCars("testChainedTransactionFailureWithSingleShard");
639 final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
640 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
641 LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
642 MoreExecutors.directExecutor());
644 final DOMTransactionChainListener listener = mock(DOMTransactionChainListener.class);
645 final DOMTransactionChain txChain = broker.createTransactionChain(listener);
647 final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
649 writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, Builders.containerBuilder()
650 .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME))
651 .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk"))
654 final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS))
656 assertThat(ex, instanceOf(TransactionCommitFailedException.class));
658 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
665 public void testChainedTransactionFailureWithMultipleShards() throws Exception {
666 initDatastoresWithCarsAndPeople("testChainedTransactionFailureWithMultipleShards");
668 final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
669 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
670 LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
671 MoreExecutors.directExecutor());
673 final DOMTransactionChainListener listener = mock(DOMTransactionChainListener.class);
674 final DOMTransactionChain txChain = broker.createTransactionChain(listener);
676 final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
678 writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
680 // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
681 // done for put for performance reasons.
682 writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, Builders.containerBuilder()
683 .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME))
684 .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk"))
687 final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS))
689 assertThat(ex, instanceOf(TransactionCommitFailedException.class));
691 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
698 public void testSingleShardTransactionsWithLeaderChanges() throws Exception {
699 followerDatastoreContextBuilder.backendAlivenessTimerIntervalInSeconds(2);
700 final String testName = "testSingleShardTransactionsWithLeaderChanges";
701 initDatastoresWithCars(testName);
703 final String followerCarShardName = "member-2-shard-cars-" + testName;
704 InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 1, ApplyJournalEntries.class);
706 // Write top-level car container from the follower so it uses a remote Tx.
708 DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
710 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
711 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
713 followerTestKit.doCommit(writeTx.ready());
715 InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
717 // Switch the leader to the follower
719 sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
720 .shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
722 TestKit.shutdownActorSystem(leaderSystem, true);
723 Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
725 followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorUtils(), CARS);
727 leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
728 Cluster.get(leaderSystem).join(MEMBER_2_ADDRESS);
730 final DatastoreContext.Builder newMember1Builder = DatastoreContext.newBuilder()
731 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
732 IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder, commitTimeout);
734 try (AbstractDataStore ds =
735 newMember1TestKit.setupAbstractDataStore(
736 testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS)) {
738 followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), CARS);
740 // Write a car entry to the new leader - should switch to local Tx
742 writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
744 MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
745 YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
746 writeTx.merge(car1Path, car1);
748 followerTestKit.doCommit(writeTx.ready());
750 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1);
755 public void testReadyLocalTransactionForwardedToLeader() throws Exception {
756 initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader");
757 followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars");
759 final Optional<ActorRef> carsFollowerShard =
760 followerDistributedDataStore.getActorUtils().findLocalShard("cars");
761 assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
763 final DataTree dataTree = new InMemoryDataTreeFactory().create(
764 DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full());
766 // Send a tx with immediate commit.
768 DataTreeModification modification = dataTree.takeSnapshot().newModification();
769 new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
770 new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
772 final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
773 new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
774 modification.ready();
776 ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true, Optional.empty());
778 carsFollowerShard.orElseThrow().tell(readyLocal, followerTestKit.getRef());
779 Object resp = followerTestKit.expectMsgClass(Object.class);
780 if (resp instanceof akka.actor.Status.Failure) {
781 throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
784 assertEquals("Response type", CommitTransactionReply.class, resp.getClass());
786 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1);
788 // Send another tx without immediate commit.
790 modification = dataTree.takeSnapshot().newModification();
791 MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(30000));
792 new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
793 modification.ready();
795 readyLocal = new ReadyLocalTransaction(tx2 , modification, false, Optional.empty());
797 carsFollowerShard.orElseThrow().tell(readyLocal, followerTestKit.getRef());
798 resp = followerTestKit.expectMsgClass(Object.class);
799 if (resp instanceof akka.actor.Status.Failure) {
800 throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
803 assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
805 final ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection(
806 ((ReadyTransactionReply)resp).getCohortPath());
808 ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(leaderDistributedDataStore.getActorUtils(),
809 List.of(new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor),
810 () -> DataStoreVersions.CURRENT_VERSION)), tx2);
811 cohort.canCommit().get(5, TimeUnit.SECONDS);
812 cohort.preCommit().get(5, TimeUnit.SECONDS);
813 cohort.commit().get(5, TimeUnit.SECONDS);
815 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
819 public void testForwardedReadyTransactionForwardedToLeader() throws Exception {
820 initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader");
821 followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars");
823 final Optional<ActorRef> carsFollowerShard =
824 followerDistributedDataStore.getActorUtils().findLocalShard("cars");
825 assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
827 carsFollowerShard.orElseThrow().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef());
828 final DataTree dataTree = followerTestKit.expectMsgClass(DataTree.class);
830 // Send a tx with immediate commit.
832 DataTreeModification modification = dataTree.takeSnapshot().newModification();
833 new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
834 new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
836 final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
837 new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
839 ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1, DataStoreVersions.CURRENT_VERSION,
840 new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx1, modification),
841 true, Optional.empty());
843 carsFollowerShard.orElseThrow().tell(forwardedReady, followerTestKit.getRef());
844 Object resp = followerTestKit.expectMsgClass(Object.class);
845 if (resp instanceof akka.actor.Status.Failure) {
846 throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
849 assertEquals("Response type", CommitTransactionReply.class, resp.getClass());
851 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1);
853 // Send another tx without immediate commit.
855 modification = dataTree.takeSnapshot().newModification();
856 MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(30000));
857 new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
859 forwardedReady = new ForwardedReadyTransaction(tx2, DataStoreVersions.CURRENT_VERSION,
860 new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx2, modification),
861 false, Optional.empty());
863 carsFollowerShard.orElseThrow().tell(forwardedReady, followerTestKit.getRef());
864 resp = followerTestKit.expectMsgClass(Object.class);
865 if (resp instanceof akka.actor.Status.Failure) {
866 throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
869 assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
871 ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection(
872 ((ReadyTransactionReply)resp).getCohortPath());
874 final ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
875 leaderDistributedDataStore.getActorUtils(), List.of(
876 new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor),
877 () -> DataStoreVersions.CURRENT_VERSION)), tx2);
878 cohort.canCommit().get(5, TimeUnit.SECONDS);
879 cohort.preCommit().get(5, TimeUnit.SECONDS);
880 cohort.commit().get(5, TimeUnit.SECONDS);
882 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
886 public void testTransactionForwardedToLeaderAfterRetry() throws Exception {
887 followerDatastoreContextBuilder.shardBatchedModificationCount(2);
888 leaderDatastoreContextBuilder.shardBatchedModificationCount(2);
889 initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry");
891 // Verify backend statistics on start
892 verifyCarsReadWriteTransactions(leaderDistributedDataStore, 0);
893 verifyCarsReadWriteTransactions(followerDistributedDataStore, 0);
895 // Do an initial write to get the primary shard info cached.
897 final DOMStoreWriteTransaction initialWriteTx = followerDistributedDataStore.newWriteOnlyTransaction();
898 initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
899 initialWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
900 followerTestKit.doCommit(initialWriteTx.ready());
902 // Wait for the commit to be replicated to the follower.
904 MemberNode.verifyRaftState(followerDistributedDataStore, "cars",
905 raftState -> assertEquals("getLastApplied", 1, raftState.getLastApplied()));
907 MemberNode.verifyRaftState(followerDistributedDataStore, "people",
908 raftState -> assertEquals("getLastApplied", 1, raftState.getLastApplied()));
910 // Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in
913 final DOMStoreWriteTransaction writeTx1 = followerDistributedDataStore.newWriteOnlyTransaction();
914 writeTx1.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
915 writeTx1.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
916 final DOMStoreThreePhaseCommitCohort writeTx1Cohort = writeTx1.ready();
917 final ListenableFuture<Boolean> writeTx1CanCommit = writeTx1Cohort.canCommit();
918 writeTx1CanCommit.get(5, TimeUnit.SECONDS);
920 // Prepare and ready another WO tx that writes to 2 shards but don't canCommit yet. This will be queued
921 // in the leader shard.
923 final DOMStoreWriteTransaction writeTx2 = followerDistributedDataStore.newWriteOnlyTransaction();
924 final LinkedList<MapEntryNode> cars = new LinkedList<>();
926 cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
927 writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
929 NormalizedNode people = ImmutableNodes.mapNodeBuilder(PeopleModel.PERSON_QNAME)
930 .withChild(PeopleModel.newPersonEntry("Dude")).build();
931 writeTx2.write(PeopleModel.PERSON_LIST_PATH, people);
932 final DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready();
934 // At this point only leader should see the transactions
935 verifyCarsReadWriteTransactions(leaderDistributedDataStore, 2);
936 verifyCarsReadWriteTransactions(followerDistributedDataStore, 0);
938 // Prepare another WO that writes to a single shard and thus will be directly committed on ready. This
939 // tx writes 5 cars so 2 BatchedModifications messages will be sent initially and cached in the leader shard
940 // (with shardBatchedModificationCount set to 2). The 3rd BatchedModifications will be sent on ready.
942 final DOMStoreWriteTransaction writeTx3 = followerDistributedDataStore.newWriteOnlyTransaction();
943 for (int i = 1; i <= 5; i++, carIndex++) {
944 cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
945 writeTx3.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
948 // Prepare another WO that writes to a single shard. This will send a single BatchedModifications message
951 final DOMStoreWriteTransaction writeTx4 = followerDistributedDataStore.newWriteOnlyTransaction();
952 cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
953 writeTx4.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
956 // Prepare a RW tx that will create a tx actor and send a ForwardedReadyTransaction message to the leader shard
959 final DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction();
960 cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
961 final YangInstanceIdentifier carPath = CarsModel.newCarPath("car" + carIndex);
962 readWriteTx.write(carPath, cars.getLast());
964 // There is a difference here between implementations: tell-based protocol enforces batching on per-transaction
965 // level whereas ask-based protocol has a global limit towards a shard -- and hence flushes out last two
966 // transactions eagerly.
967 final int earlyTxCount = DistributedDataStore.class.isAssignableFrom(testParameter) ? 5 : 3;
968 verifyCarsReadWriteTransactions(leaderDistributedDataStore, earlyTxCount);
969 verifyCarsReadWriteTransactions(followerDistributedDataStore, 0);
971 // Disable elections on the leader so it switches to follower.
973 sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder
974 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
975 .shardElectionTimeoutFactor(10));
977 leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorUtils(), "cars");
979 // Submit all tx's - the messages should get queued for retry.
981 final ListenableFuture<Boolean> writeTx2CanCommit = writeTx2Cohort.canCommit();
982 final DOMStoreThreePhaseCommitCohort writeTx3Cohort = writeTx3.ready();
983 final DOMStoreThreePhaseCommitCohort writeTx4Cohort = writeTx4.ready();
984 final DOMStoreThreePhaseCommitCohort rwTxCohort = readWriteTx.ready();
986 // Enable elections on the other follower so it becomes the leader, at which point the
987 // tx's should get forwarded from the previous leader to the new leader to complete the commits.
989 sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
990 .customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1));
991 IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorUtils(), "cars")
992 .tell(TimeoutNow.INSTANCE, ActorRef.noSender());
993 IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorUtils(), "people")
994 .tell(TimeoutNow.INSTANCE, ActorRef.noSender());
996 followerTestKit.doCommit(writeTx1CanCommit, writeTx1Cohort);
997 followerTestKit.doCommit(writeTx2CanCommit, writeTx2Cohort);
998 followerTestKit.doCommit(writeTx3Cohort);
999 followerTestKit.doCommit(writeTx4Cohort);
1000 followerTestKit.doCommit(rwTxCohort);
1002 // At this point everything is committed and the follower datastore should see 5 transactions, but leader should
1003 // only see the initial transactions
1004 verifyCarsReadWriteTransactions(leaderDistributedDataStore, earlyTxCount);
1005 verifyCarsReadWriteTransactions(followerDistributedDataStore, 5);
1007 DOMStoreReadTransaction readTx = leaderDistributedDataStore.newReadOnlyTransaction();
1008 verifyCars(readTx, cars.toArray(new MapEntryNode[cars.size()]));
1009 verifyNode(readTx, PeopleModel.PERSON_LIST_PATH, people);
1012 private static void verifyCarsReadWriteTransactions(final AbstractDataStore datastore, final int expected)
1014 IntegrationTestKit.verifyShardStats(datastore, "cars",
1015 stats -> assertEquals("getReadWriteTransactionCount", expected, stats.getReadWriteTransactionCount()));
1019 public void testLeadershipTransferOnShutdown() throws Exception {
1020 leaderDatastoreContextBuilder.shardBatchedModificationCount(1);
1021 followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null);
1022 final String testName = "testLeadershipTransferOnShutdown";
1023 initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE);
1025 final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System,
1026 DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500),
1028 try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore(
1029 testParameter, testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) {
1031 followerTestKit.waitForMembersUp("member-3");
1032 follower2TestKit.waitForMembersUp("member-1", "member-2");
1034 // Create and submit a couple tx's so they're pending.
1036 DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
1037 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1038 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
1039 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
1040 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
1042 final var usesCohorts = DistributedDataStore.class.isAssignableFrom(testParameter);
1044 IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
1045 stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize()));
1048 writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
1049 final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
1050 writeTx.write(CarsModel.newCarPath("optima"), car);
1051 final DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready();
1054 IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
1055 stats -> assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize()));
1058 // Gracefully stop the leader via a Shutdown message.
1060 sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder
1061 .shardElectionTimeoutFactor(100));
1063 final FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
1064 final Future<ActorRef> future = leaderDistributedDataStore.getActorUtils().findLocalShardAsync("cars");
1065 final ActorRef leaderActor = Await.result(future, duration);
1067 final Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE);
1069 // Commit the 2 transactions. They should finish and succeed.
1071 followerTestKit.doCommit(cohort1);
1072 followerTestKit.doCommit(cohort2);
1074 // Wait for the leader actor stopped.
1076 final Boolean stopped = Await.result(stopFuture, duration);
1077 assertEquals("Stopped", Boolean.TRUE, stopped);
1079 // Verify leadership was transferred by reading the committed data from the other nodes.
1081 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car);
1082 verifyCars(follower2DistributedDataStore.newReadOnlyTransaction(), car);
1087 public void testTransactionWithIsolatedLeader() throws Exception {
1088 // Set the isolated leader check interval high so we can control the switch to IsolatedLeader.
1089 leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000);
1090 final String testName = "testTransactionWithIsolatedLeader";
1091 initDatastoresWithCars(testName);
1093 // Tx that is submitted after the follower is stopped but before the leader transitions to IsolatedLeader.
1094 final DOMStoreWriteTransaction preIsolatedLeaderWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
1095 preIsolatedLeaderWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1097 // Tx that is submitted after the leader transitions to IsolatedLeader.
1098 final DOMStoreWriteTransaction noShardLeaderWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
1099 noShardLeaderWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1101 // Tx that is submitted after the follower is reinstated.
1102 final DOMStoreWriteTransaction successWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
1103 successWriteTx.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1105 // Stop the follower
1106 followerTestKit.watch(followerDistributedDataStore.getActorUtils().getShardManager());
1107 followerDistributedDataStore.close();
1108 followerTestKit.expectTerminated(followerDistributedDataStore.getActorUtils().getShardManager());
1110 // Submit the preIsolatedLeaderWriteTx so it's pending
1111 final DOMStoreThreePhaseCommitCohort preIsolatedLeaderTxCohort = preIsolatedLeaderWriteTx.ready();
1113 // Change the isolated leader check interval low so it changes to IsolatedLeader.
1114 sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder
1115 .shardIsolatedLeaderCheckIntervalInMillis(200));
1117 MemberNode.verifyRaftState(leaderDistributedDataStore, "cars",
1118 raftState -> assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState()));
1120 final var noShardLeaderCohort = noShardLeaderWriteTx.ready();
1121 final ListenableFuture<Boolean> canCommit;
1123 // There is difference in behavior here:
1124 if (!leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
1125 // ask-based canCommit() times out and aborts
1126 final var ex = assertThrows(ExecutionException.class,
1127 () -> leaderTestKit.doCommit(noShardLeaderCohort)).getCause();
1128 assertThat(ex, instanceOf(NoShardLeaderException.class));
1129 assertThat(ex.getMessage(), containsString(
1130 "Shard member-1-shard-cars-testTransactionWithIsolatedLeader currently has no leader."));
1133 // tell-based canCommit() does not have a real timeout and hence continues
1134 canCommit = noShardLeaderCohort.canCommit();
1135 Uninterruptibles.sleepUninterruptibly(commitTimeout, TimeUnit.SECONDS);
1136 assertFalse(canCommit.isDone());
1139 sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder
1140 .shardElectionTimeoutFactor(100));
1142 final DOMStoreThreePhaseCommitCohort successTxCohort = successWriteTx.ready();
1144 followerDistributedDataStore = followerTestKit.setupAbstractDataStore(
1145 testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS);
1147 leaderTestKit.doCommit(preIsolatedLeaderTxCohort);
1148 leaderTestKit.doCommit(successTxCohort);
1150 // continuation of tell-based protocol: readied transaction will complete commit, but will report an OLFE
1151 if (canCommit != null) {
1152 final var ex = assertThrows(ExecutionException.class,
1153 () -> canCommit.get(commitTimeout, TimeUnit.SECONDS)).getCause();
1154 assertThat(ex, instanceOf(OptimisticLockFailedException.class));
1155 assertEquals("Optimistic lock failed for path " + CarsModel.BASE_PATH, ex.getMessage());
1156 final var cause = ex.getCause();
1157 assertThat(cause, instanceOf(ConflictingModificationAppliedException.class));
1158 final var cmae = (ConflictingModificationAppliedException) cause;
1159 assertEquals("Node was created by other transaction.", cmae.getMessage());
1160 assertEquals(CarsModel.BASE_PATH, cmae.getPath());
1165 public void testTransactionWithShardLeaderNotResponding() throws Exception {
1166 followerDatastoreContextBuilder.frontendRequestTimeoutInSeconds(2);
1167 followerDatastoreContextBuilder.shardElectionTimeoutFactor(50);
1168 initDatastoresWithCars("testTransactionWithShardLeaderNotResponding");
1170 // Do an initial read to get the primary shard info cached.
1172 final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
1173 readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1175 // Shutdown the leader and try to create a new tx.
1177 TestKit.shutdownActorSystem(leaderSystem, true);
1179 followerDatastoreContextBuilder.operationTimeoutInMillis(50).shardElectionTimeoutFactor(1);
1180 sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
1182 final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
1184 rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1186 final var ex = assertThrows(ExecutionException.class, () -> followerTestKit.doCommit(rwTx.ready()));
1187 final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(ex.getCause());
1188 if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
1189 assertTrue(msg, Throwables.getRootCause(ex) instanceof NoShardLeaderException
1190 || ex.getCause() instanceof ShardLeaderNotRespondingException);
1192 assertThat(msg, Throwables.getRootCause(ex), instanceOf(RequestTimeoutException.class));
1197 public void testTransactionWithCreateTxFailureDueToNoLeader() throws Exception {
1198 followerDatastoreContextBuilder.frontendRequestTimeoutInSeconds(2);
1199 initDatastoresWithCars("testTransactionWithCreateTxFailureDueToNoLeader");
1201 // Do an initial read to get the primary shard info cached.
1203 final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
1204 readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1206 // Shutdown the leader and try to create a new tx.
1208 TestKit.shutdownActorSystem(leaderSystem, true);
1210 Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
1212 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
1214 sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
1215 .operationTimeoutInMillis(10).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
1217 final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
1219 rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1221 final var ex = assertThrows(ExecutionException.class, () -> followerTestKit.doCommit(rwTx.ready()));
1222 final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(ex.getCause());
1223 if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
1224 assertThat(msg, Throwables.getRootCause(ex), instanceOf(NoShardLeaderException.class));
1226 assertThat(msg, Throwables.getRootCause(ex), instanceOf(RequestTimeoutException.class));
1231 public void testTransactionRetryWithInitialAskTimeoutExOnCreateTx() throws Exception {
1232 followerDatastoreContextBuilder.backendAlivenessTimerIntervalInSeconds(2);
1233 String testName = "testTransactionRetryWithInitialAskTimeoutExOnCreateTx";
1234 initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS);
1236 final DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder()
1237 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10);
1238 final IntegrationTestKit follower2TestKit = new IntegrationTestKit(
1239 follower2System, follower2DatastoreContextBuilder, commitTimeout);
1241 try (AbstractDataStore ds =
1242 follower2TestKit.setupAbstractDataStore(
1243 testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS)) {
1245 followerTestKit.waitForMembersUp("member-1", "member-3");
1246 follower2TestKit.waitForMembersUp("member-1", "member-2");
1248 // Do an initial read to get the primary shard info cached.
1250 final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
1251 readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1253 // Shutdown the leader and try to create a new tx.
1255 TestKit.shutdownActorSystem(leaderSystem, true);
1257 Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
1259 sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
1260 .operationTimeoutInMillis(500).shardElectionTimeoutFactor(5).customRaftPolicyImplementation(null));
1262 final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
1264 rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1266 followerTestKit.doCommit(rwTx.ready());
1271 public void testSemiReachableCandidateNotDroppingLeader() throws Exception {
1272 final String testName = "testSemiReachableCandidateNotDroppingLeader";
1273 initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS);
1275 final DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder()
1276 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10);
1277 final IntegrationTestKit follower2TestKit = new IntegrationTestKit(
1278 follower2System, follower2DatastoreContextBuilder, commitTimeout);
1280 final AbstractDataStore ds2 =
1281 follower2TestKit.setupAbstractDataStore(
1282 testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS);
1284 followerTestKit.waitForMembersUp("member-1", "member-3");
1285 follower2TestKit.waitForMembersUp("member-1", "member-2");
1287 // behavior is controlled by akka.coordinated-shutdown.run-by-actor-system-terminate configuration option
1288 TestKit.shutdownActorSystem(follower2System, true);
1290 ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow();
1291 final OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
1292 .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
1294 Cluster leaderCluster = Cluster.get(leaderSystem);
1295 Cluster followerCluster = Cluster.get(followerSystem);
1296 Cluster follower2Cluster = Cluster.get(follower2System);
1298 Member follower2Member = follower2Cluster.readView().self();
1300 await().atMost(10, TimeUnit.SECONDS)
1301 .until(() -> containsUnreachable(leaderCluster, follower2Member));
1302 await().atMost(10, TimeUnit.SECONDS)
1303 .until(() -> containsUnreachable(followerCluster, follower2Member));
1305 ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow();
1307 // to simulate a follower not being able to receive messages, but still being able to send messages and becoming
1308 // candidate, we can just send a couple of RequestVotes to both leader and follower.
1309 cars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null);
1310 followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null);
1311 cars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null);
1312 followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null);
1314 OnDemandRaftState stateAfter = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
1315 .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
1316 OnDemandRaftState followerState = (OnDemandRaftState) followerDistributedDataStore.getActorUtils()
1317 .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
1319 assertEquals(initialState.getCurrentTerm(), stateAfter.getCurrentTerm());
1320 assertEquals(initialState.getCurrentTerm(), followerState.getCurrentTerm());
1325 private static Boolean containsUnreachable(final Cluster cluster, final Member member) {
1326 // unreachableMembers() returns scala.collection.immutable.Set, but we are using scala.collection.Set to fix JDT
1327 // see https://bugs.eclipse.org/bugs/show_bug.cgi?id=468276#c32
1328 final Set<Member> members = cluster.readView().unreachableMembers();
1329 return members.contains(member);
1333 public void testInstallSnapshot() throws Exception {
1334 final String testName = "testInstallSnapshot";
1335 final String leaderCarShardName = "member-1-shard-cars-" + testName;
1336 final String followerCarShardName = "member-2-shard-cars-" + testName;
1338 // Setup a saved snapshot on the leader. The follower will startup with no data and the leader should
1339 // install a snapshot to sync the follower.
1341 DataTree tree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_CONFIGURATION,
1342 SchemaContextHelper.full());
1344 final ContainerNode carsNode = CarsModel.newCarsNode(
1345 CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", Uint64.valueOf(20000))));
1346 AbstractShardTest.writeToStore(tree, CarsModel.BASE_PATH, carsNode);
1348 final NormalizedNode snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.of());
1349 final Snapshot initialSnapshot = Snapshot.create(
1350 new ShardSnapshotState(new MetadataShardDataTreeSnapshot(snapshotRoot)),
1351 Collections.emptyList(), 5, 1, 5, 1, 1, null, null);
1352 InMemorySnapshotStore.addSnapshot(leaderCarShardName, initialSnapshot);
1354 InMemorySnapshotStore.addSnapshotSavedLatch(leaderCarShardName);
1355 InMemorySnapshotStore.addSnapshotSavedLatch(followerCarShardName);
1357 initDatastoresWithCars(testName);
1359 assertEquals(Optional.of(carsNode), leaderDistributedDataStore.newReadOnlyTransaction().read(
1360 CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS));
1362 verifySnapshot(InMemorySnapshotStore.waitForSavedSnapshot(leaderCarShardName, Snapshot.class),
1363 initialSnapshot, snapshotRoot);
1365 verifySnapshot(InMemorySnapshotStore.waitForSavedSnapshot(followerCarShardName, Snapshot.class),
1366 initialSnapshot, snapshotRoot);
1370 public void testReadWriteMessageSlicing() throws Exception {
1371 // The slicing is only implemented for tell-based protocol
1372 assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter));
1374 leaderDatastoreContextBuilder.maximumMessageSliceSize(100);
1375 followerDatastoreContextBuilder.maximumMessageSliceSize(100);
1376 initDatastoresWithCars("testLargeReadReplySlicing");
1378 final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
1380 final NormalizedNode carsNode = CarsModel.create();
1381 rwTx.write(CarsModel.BASE_PATH, carsNode);
1383 verifyNode(rwTx, CarsModel.BASE_PATH, carsNode);
1386 @SuppressWarnings("IllegalCatch")
1388 public void testRaftCallbackDuringLeadershipDrop() throws Exception {
1389 final String testName = "testRaftCallbackDuringLeadershipDrop";
1390 initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS);
1392 final ExecutorService executor = Executors.newSingleThreadExecutor();
1394 final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System,
1395 DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500)
1396 .shardLeaderElectionTimeoutInSeconds(3600),
1399 final DOMStoreWriteTransaction initialWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
1400 initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1401 leaderTestKit.doCommit(initialWriteTx.ready());
1403 try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore(
1404 testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false)) {
1406 final ActorRef member3Cars = ((LocalShardStore) follower2DistributedDataStore).getLocalShards()
1407 .getLocalShards().get("cars").getActor();
1408 final ActorRef member2Cars = ((LocalShardStore)followerDistributedDataStore).getLocalShards()
1409 .getLocalShards().get("cars").getActor();
1410 member2Cars.tell(new StartDropMessages(AppendEntries.class), null);
1411 member3Cars.tell(new StartDropMessages(AppendEntries.class), null);
1413 final DOMStoreWriteTransaction newTx = leaderDistributedDataStore.newWriteOnlyTransaction();
1414 newTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
1415 final AtomicBoolean submitDone = new AtomicBoolean(false);
1416 executor.submit(() -> {
1418 leaderTestKit.doCommit(newTx.ready());
1419 submitDone.set(true);
1420 } catch (Exception e) {
1421 throw new RuntimeException(e);
1424 final ActorRef leaderCars = ((LocalShardStore) leaderDistributedDataStore).getLocalShards()
1425 .getLocalShards().get("cars").getActor();
1426 await().atMost(10, TimeUnit.SECONDS)
1427 .until(() -> ((OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
1428 .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE)).getLastIndex() >= 1);
1430 final OnDemandRaftState raftState = (OnDemandRaftState)leaderDistributedDataStore.getActorUtils()
1431 .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE);
1433 // Simulate a follower not receiving heartbeats but still being able to send messages ie RequestVote with
1434 // new term(switching to candidate after election timeout)
1435 leaderCars.tell(new RequestVote(raftState.getCurrentTerm() + 1,
1436 "member-3-shard-cars-testRaftCallbackDuringLeadershipDrop", -1,
1439 member2Cars.tell(new StopDropMessages(AppendEntries.class), null);
1440 member3Cars.tell(new StopDropMessages(AppendEntries.class), null);
1442 await("Is tx stuck in COMMIT_PENDING")
1443 .atMost(10, TimeUnit.SECONDS).untilAtomic(submitDone, equalTo(true));
1447 executor.shutdownNow();
1451 public void testSnapshotOnRootOverwrite() throws Exception {
1452 initDatastores("testSnapshotOnRootOverwrite", "module-shards-default-cars-member1-and-2.conf",
1453 new String[] {"cars", "default"},
1454 leaderDatastoreContextBuilder.snapshotOnRootOverwrite(true),
1455 followerDatastoreContextBuilder.snapshotOnRootOverwrite(true));
1457 leaderTestKit.waitForMembersUp("member-2");
1458 final ContainerNode rootNode = Builders.containerBuilder()
1459 .withNodeIdentifier(NodeIdentifier.create(SchemaContext.NAME))
1460 .withChild(CarsModel.create())
1463 leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.of(), rootNode);
1465 // FIXME: CONTROLLER-2020: ClientBackedDatastore does not have stable indexes/term,
1466 // the snapshot index seems to fluctuate
1467 assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
1468 IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars",
1469 state -> assertEquals(1, state.getSnapshotIndex()));
1471 IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars",
1472 state -> assertEquals(1, state.getSnapshotIndex()));
1474 verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 1);
1475 verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 1);
1477 for (int i = 0; i < 10; i++) {
1478 leaderTestKit.testWriteTransaction(leaderDistributedDataStore, CarsModel.newCarPath("car " + i),
1479 CarsModel.newCarEntry("car " + i, Uint64.ONE));
1482 // fake snapshot causes the snapshotIndex to move
1483 IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars",
1484 state -> assertEquals(10, state.getSnapshotIndex()));
1485 IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars",
1486 state -> assertEquals(10, state.getSnapshotIndex()));
1488 // however the real snapshot still has not changed and was taken at index 1
1489 verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 1);
1490 verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 1);
1492 // root overwrite so expect a snapshot
1493 leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.of(), rootNode);
1495 // this was a real snapshot so everything should be in it(1(DisableTrackingPayload) + 1 + 10 + 1)
1496 IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars",
1497 state -> assertEquals(12, state.getSnapshotIndex()));
1498 IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars",
1499 state -> assertEquals(12, state.getSnapshotIndex()));
1501 verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 12);
1502 verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 12);
1505 private static void verifySnapshot(final String persistenceId, final long lastAppliedIndex) {
1506 await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
1507 List<Snapshot> snap = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class);
1508 assertEquals(1, snap.size());
1509 assertEquals(lastAppliedIndex, snap.get(0).getLastAppliedIndex());
1514 private static void verifySnapshot(final Snapshot actual, final Snapshot expected,
1515 final NormalizedNode expRoot) {
1516 assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());
1517 assertEquals("Snapshot getLastAppliedIndex", expected.getLastAppliedIndex(), actual.getLastAppliedIndex());
1518 assertEquals("Snapshot getLastTerm", expected.getLastTerm(), actual.getLastTerm());
1519 assertEquals("Snapshot getLastIndex", expected.getLastIndex(), actual.getLastIndex());
1520 assertEquals("Snapshot state type", ShardSnapshotState.class, actual.getState().getClass());
1521 MetadataShardDataTreeSnapshot shardSnapshot =
1522 (MetadataShardDataTreeSnapshot) ((ShardSnapshotState)actual.getState()).getSnapshot();
1523 assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().orElseThrow());
1526 private static void sendDatastoreContextUpdate(final AbstractDataStore dataStore, final Builder builder) {
1527 final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build());
1528 final DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
1529 final Answer<DatastoreContext> answer = invocation -> newBuilder.build();
1530 doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext();
1531 doAnswer(answer).when(mockContextFactory).getShardDatastoreContext(anyString());
1532 dataStore.onDatastoreContextUpdated(mockContextFactory);