2 * Copyright (c) 2015, 2017 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.awaitility.Awaitility.await;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.junit.Assert.fail;
16 import static org.mockito.ArgumentMatchers.any;
17 import static org.mockito.ArgumentMatchers.eq;
18 import static org.mockito.Mockito.timeout;
19 import static org.mockito.Mockito.verify;
21 import akka.actor.ActorRef;
22 import akka.actor.ActorSelection;
23 import akka.actor.ActorSystem;
24 import akka.actor.Address;
25 import akka.actor.AddressFromURIString;
26 import akka.cluster.Cluster;
27 import akka.cluster.Member;
28 import akka.dispatch.Futures;
29 import akka.pattern.Patterns;
30 import akka.testkit.javadsl.TestKit;
31 import com.google.common.base.Stopwatch;
32 import com.google.common.base.Throwables;
33 import com.google.common.collect.ImmutableMap;
34 import com.google.common.collect.Range;
35 import com.google.common.primitives.UnsignedLong;
36 import com.google.common.util.concurrent.ListenableFuture;
37 import com.google.common.util.concurrent.MoreExecutors;
38 import com.google.common.util.concurrent.Uninterruptibles;
39 import com.typesafe.config.ConfigFactory;
40 import java.math.BigInteger;
41 import java.util.Arrays;
42 import java.util.Collection;
43 import java.util.Collections;
44 import java.util.Iterator;
45 import java.util.LinkedList;
46 import java.util.List;
47 import java.util.Optional;
49 import java.util.concurrent.ExecutionException;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.atomic.AtomicLong;
52 import java.util.function.Supplier;
53 import org.junit.After;
54 import org.junit.Assume;
55 import org.junit.Before;
56 import org.junit.Ignore;
57 import org.junit.Test;
58 import org.junit.runner.RunWith;
59 import org.junit.runners.Parameterized;
60 import org.junit.runners.Parameterized.Parameter;
61 import org.junit.runners.Parameterized.Parameters;
62 import org.mockito.Mockito;
63 import org.mockito.stubbing.Answer;
64 import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
65 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
66 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
67 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
68 import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
69 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
70 import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
71 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
72 import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
73 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
74 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
75 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
76 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
77 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
78 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
79 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
80 import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata;
81 import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
82 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
83 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
84 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
85 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
86 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
87 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
88 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
89 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
90 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
91 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
92 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
93 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
94 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
95 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
96 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
97 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
98 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
99 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
100 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
101 import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
102 import org.opendaylight.mdsal.dom.spi.store.DOMStore;
103 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
104 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
105 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
106 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
107 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
108 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
109 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
110 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
111 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
112 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
113 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
114 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
115 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
116 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
117 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
118 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
119 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
120 import scala.concurrent.Await;
121 import scala.concurrent.Future;
122 import scala.concurrent.duration.FiniteDuration;
125 * End-to-end distributed data store tests that exercise remote shards and transactions.
127 * @author Thomas Pantelis
129 @RunWith(Parameterized.class)
130 public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
132 @Parameters(name = "{0}")
133 public static Collection<Object[]> data() {
134 return Arrays.asList(new Object[][] {
135 { TestDistributedDataStore.class, 7}, { TestClientBackedDataStore.class, 12 }
140 public Class<? extends AbstractDataStore> testParameter;
142 public int commitTimeout;
144 private static final String[] CARS_AND_PEOPLE = {"cars", "people"};
145 private static final String[] CARS = {"cars"};
147 private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse(
148 "akka://cluster-test@127.0.0.1:2558");
149 private static final Address MEMBER_2_ADDRESS = AddressFromURIString.parse(
150 "akka://cluster-test@127.0.0.1:2559");
152 private static final String MODULE_SHARDS_CARS_ONLY_1_2 = "module-shards-cars-member-1-and-2.conf";
153 private static final String MODULE_SHARDS_CARS_PEOPLE_1_2 = "module-shards-member1-and-2.conf";
154 private static final String MODULE_SHARDS_CARS_PEOPLE_1_2_3 = "module-shards-member1-and-2-and-3.conf";
155 private static final String MODULE_SHARDS_CARS_1_2_3 = "module-shards-cars-member-1-and-2-and-3.conf";
157 private ActorSystem leaderSystem;
158 private ActorSystem followerSystem;
159 private ActorSystem follower2System;
161 private final DatastoreContext.Builder leaderDatastoreContextBuilder =
162 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
164 private final DatastoreContext.Builder followerDatastoreContextBuilder =
165 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
166 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
167 private final TransactionIdentifier tx1 = nextTransactionId();
168 private final TransactionIdentifier tx2 = nextTransactionId();
170 private AbstractDataStore followerDistributedDataStore;
171 private AbstractDataStore leaderDistributedDataStore;
172 private IntegrationTestKit followerTestKit;
173 private IntegrationTestKit leaderTestKit;
176 public void setUp() {
177 InMemoryJournal.clear();
178 InMemorySnapshotStore.clear();
180 leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
181 Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
183 followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
184 Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
186 follower2System = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member3"));
187 Cluster.get(follower2System).join(MEMBER_1_ADDRESS);
191 public void tearDown() {
192 if (followerDistributedDataStore != null) {
193 leaderDistributedDataStore.close();
195 if (leaderDistributedDataStore != null) {
196 leaderDistributedDataStore.close();
199 TestKit.shutdownActorSystem(leaderSystem);
200 TestKit.shutdownActorSystem(followerSystem);
201 TestKit.shutdownActorSystem(follower2System);
203 InMemoryJournal.clear();
204 InMemorySnapshotStore.clear();
207 private void initDatastoresWithCars(final String type) throws Exception {
208 initDatastores(type, MODULE_SHARDS_CARS_ONLY_1_2, CARS);
211 private void initDatastoresWithCarsAndPeople(final String type) throws Exception {
212 initDatastores(type, MODULE_SHARDS_CARS_PEOPLE_1_2, CARS_AND_PEOPLE);
215 private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards)
217 leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder, commitTimeout);
219 leaderDistributedDataStore = leaderTestKit.setupAbstractDataStore(
220 testParameter, type, moduleShardsConfig, false, shards);
222 followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder, commitTimeout);
223 followerDistributedDataStore = followerTestKit.setupAbstractDataStore(
224 testParameter, type, moduleShardsConfig, false, shards);
226 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorUtils(), shards);
228 leaderTestKit.waitForMembersUp("member-2");
229 followerTestKit.waitForMembersUp("member-1");
232 private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries)
234 final Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
235 assertTrue("isPresent", optional.isPresent());
237 final CollectionNodeBuilder<MapEntryNode, MapNode> listBuilder = ImmutableNodes.mapNodeBuilder(
238 CarsModel.CAR_QNAME);
239 for (final NormalizedNode<?, ?> entry: entries) {
240 listBuilder.withChild((MapEntryNode) entry);
243 assertEquals("Car list node", listBuilder.build(), optional.get());
246 private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path,
247 final NormalizedNode<?, ?> expNode) throws Exception {
248 final Optional<NormalizedNode<?, ?>> optional = readTx.read(path).get(5, TimeUnit.SECONDS);
249 assertTrue("isPresent", optional.isPresent());
250 assertEquals("Data node", expNode, optional.get());
253 private static void verifyExists(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path)
255 final Boolean exists = readTx.exists(path).get(5, TimeUnit.SECONDS);
256 assertEquals("exists", Boolean.TRUE, exists);
260 public void testWriteTransactionWithSingleShard() throws Exception {
261 final String testName = "testWriteTransactionWithSingleShard";
262 initDatastoresWithCars(testName);
264 final String followerCarShardName = "member-2-shard-cars-" + testName;
266 DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
267 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
269 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
270 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
272 final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
273 final YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
274 writeTx.merge(car1Path, car1);
276 final MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
277 final YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
278 writeTx.merge(car2Path, car2);
280 followerTestKit.doCommit(writeTx.ready());
282 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1, car2);
284 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
288 writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
290 writeTx.delete(car1Path);
292 followerTestKit.doCommit(writeTx.ready());
294 verifyExists(followerDistributedDataStore.newReadOnlyTransaction(), car2Path);
296 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
298 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car2);
300 // Re-instate the follower member 2 as a single-node to verify replication and recovery.
302 // The following is a bit tricky. Before we reinstate the follower we need to ensure it has persisted and
303 // applied and all the log entries from the leader. Since we've verified the car data above we know that
304 // all the transactions have been applied on the leader so we first read and capture its lastAppliedIndex.
305 final AtomicLong leaderLastAppliedIndex = new AtomicLong();
306 IntegrationTestKit.verifyShardState(leaderDistributedDataStore, CARS[0],
307 state -> leaderLastAppliedIndex.set(state.getLastApplied()));
309 // Now we need to make sure the follower has persisted the leader's lastAppliedIndex via ApplyJournalEntries.
310 // However we don't know exactly how many ApplyJournalEntries messages there will be as it can differ between
311 // the tell-based and ask-based front-ends. For ask-based there will be exactly 2 ApplyJournalEntries but
312 // tell-based persists additional payloads which could be replicated and applied in a batch resulting in
313 // either 2 or 3 ApplyJournalEntries. To handle this we read the follower's persisted ApplyJournalEntries
314 // until we find the one that encompasses the leader's lastAppliedIndex.
315 Stopwatch sw = Stopwatch.createStarted();
316 boolean done = false;
318 final List<ApplyJournalEntries> entries = InMemoryJournal.get(followerCarShardName,
319 ApplyJournalEntries.class);
320 for (ApplyJournalEntries aje: entries) {
321 if (aje.getToIndex() >= leaderLastAppliedIndex.get()) {
327 assertTrue("Follower did not persist ApplyJournalEntries containing leader's lastAppliedIndex "
328 + leaderLastAppliedIndex + ". Entries persisted: " + entries, sw.elapsed(TimeUnit.SECONDS) <= 5);
330 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
333 TestKit.shutdownActorSystem(leaderSystem, true);
334 TestKit.shutdownActorSystem(followerSystem, true);
336 final ActorSystem newSystem = newActorSystem("reinstated-member2", "Member2");
338 try (AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder,
340 .setupAbstractDataStore(testParameter, testName, "module-shards-member2", true, CARS)) {
341 verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
346 public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
347 final String testName = "testWriteTransactionWithSingleShard";
348 initDatastoresWithCars(testName);
350 final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
352 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
353 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
354 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
355 followerTestKit.doCommit(writeTx.ready());
358 for (int i = 0; i < numCars; i++) {
359 writeTx = txChain.newWriteOnlyTransaction();
360 writeTx.write(CarsModel.newCarPath("car" + i),
361 CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
363 followerTestKit.doCommit(writeTx.ready());
365 DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
366 domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
368 domStoreReadTransaction.close();
371 // wait to let the shard catch up with purged
372 await("Range set leak test").atMost(5, TimeUnit.SECONDS)
373 .pollInterval(500, TimeUnit.MILLISECONDS)
374 .untilAsserted(() -> {
375 Optional<ActorRef> localShard =
376 leaderDistributedDataStore.getActorUtils().findLocalShard("cars");
377 FrontendShardDataTreeSnapshotMetadata frontendMetadata =
378 (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
379 .executeOperation(localShard.get(), new RequestFrontendMetadata());
381 if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
382 Iterator<FrontendHistoryMetadata> iterator =
383 frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
384 FrontendHistoryMetadata metadata = iterator.next();
385 while (iterator.hasNext() && metadata.getHistoryId() != 1) {
386 metadata = iterator.next();
389 assertEquals(0, metadata.getClosedTransactions().size());
390 assertEquals(Range.closedOpen(UnsignedLong.valueOf(0), UnsignedLong.valueOf(11)),
391 metadata.getPurgedTransactions().asRanges().iterator().next());
393 // ask based should track no metadata
394 assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
398 final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
399 .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
400 assertTrue("isPresent", optional.isPresent());
401 assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
405 @Ignore("Flushes out tell based leak needs to be handled separately")
406 public void testCloseTransactionMetadataLeak() throws Exception {
407 // Ask based frontend seems to have some issues with back to back close
408 Assume.assumeTrue(testParameter.isAssignableFrom(TestClientBackedDataStore.class));
410 final String testName = "testWriteTransactionWithSingleShard";
411 initDatastoresWithCars(testName);
413 final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
415 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
416 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
417 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
418 followerTestKit.doCommit(writeTx.ready());
421 for (int i = 0; i < numCars; i++) {
422 writeTx = txChain.newWriteOnlyTransaction();
425 DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
426 domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
428 domStoreReadTransaction.close();
431 writeTx = txChain.newWriteOnlyTransaction();
432 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
433 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
434 followerTestKit.doCommit(writeTx.ready());
436 // wait to let the shard catch up with purged
437 await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS)
438 .pollInterval(500, TimeUnit.MILLISECONDS)
439 .untilAsserted(() -> {
440 Optional<ActorRef> localShard =
441 leaderDistributedDataStore.getActorUtils().findLocalShard("cars");
442 FrontendShardDataTreeSnapshotMetadata frontendMetadata =
443 (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
444 .executeOperation(localShard.get(), new RequestFrontendMetadata());
446 if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
447 Iterator<FrontendHistoryMetadata> iterator =
448 frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
449 FrontendHistoryMetadata metadata = iterator.next();
450 while (iterator.hasNext() && metadata.getHistoryId() != 1) {
451 metadata = iterator.next();
454 Set<Range<UnsignedLong>> ranges = metadata.getPurgedTransactions().asRanges();
456 assertEquals(0, metadata.getClosedTransactions().size());
457 assertEquals(1, ranges.size());
459 // ask based should track no metadata
460 assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
464 final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
465 .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
466 assertTrue("isPresent", optional.isPresent());
467 assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
471 public void testReadWriteTransactionWithSingleShard() throws Exception {
472 initDatastoresWithCars("testReadWriteTransactionWithSingleShard");
474 final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
475 assertNotNull("newReadWriteTransaction returned null", rwTx);
477 rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
478 rwTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
480 final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
481 rwTx.merge(CarsModel.newCarPath("optima"), car1);
483 verifyCars(rwTx, car1);
485 final MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
486 final YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
487 rwTx.merge(car2Path, car2);
489 verifyExists(rwTx, car2Path);
491 followerTestKit.doCommit(rwTx.ready());
493 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1, car2);
497 public void testWriteTransactionWithMultipleShards() throws Exception {
498 initDatastoresWithCarsAndPeople("testWriteTransactionWithMultipleShards");
500 final DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
501 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
503 final YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
504 final NormalizedNode<?, ?> carsNode = CarsModel.emptyContainer();
505 writeTx.write(carsPath, carsNode);
507 final YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
508 final NormalizedNode<?, ?> peopleNode = PeopleModel.emptyContainer();
509 writeTx.write(peoplePath, peopleNode);
511 followerTestKit.doCommit(writeTx.ready());
513 final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
515 verifyNode(readTx, carsPath, carsNode);
516 verifyNode(readTx, peoplePath, peopleNode);
520 public void testReadWriteTransactionWithMultipleShards() throws Exception {
521 initDatastoresWithCarsAndPeople("testReadWriteTransactionWithMultipleShards");
523 final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
524 assertNotNull("newReadWriteTransaction returned null", rwTx);
526 final YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
527 final NormalizedNode<?, ?> carsNode = CarsModel.emptyContainer();
528 rwTx.write(carsPath, carsNode);
530 final YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
531 final NormalizedNode<?, ?> peopleNode = PeopleModel.emptyContainer();
532 rwTx.write(peoplePath, peopleNode);
534 followerTestKit.doCommit(rwTx.ready());
536 final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
538 verifyNode(readTx, carsPath, carsNode);
539 verifyNode(readTx, peoplePath, peopleNode);
543 public void testTransactionChainWithSingleShard() throws Exception {
544 initDatastoresWithCars("testTransactionChainWithSingleShard");
546 final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
548 // Add the top-level cars container with write-only.
550 final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
551 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
553 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
555 final DOMStoreThreePhaseCommitCohort writeTxReady = writeTx.ready();
557 // Verify the top-level cars container with read-only.
559 verifyNode(txChain.newReadOnlyTransaction(), CarsModel.BASE_PATH, CarsModel.emptyContainer());
561 // Perform car operations with read-write.
563 final DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
565 verifyNode(rwTx, CarsModel.BASE_PATH, CarsModel.emptyContainer());
567 rwTx.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
569 final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
570 final YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
571 rwTx.write(car1Path, car1);
573 verifyExists(rwTx, car1Path);
575 verifyCars(rwTx, car1);
577 final MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
578 rwTx.merge(CarsModel.newCarPath("sportage"), car2);
580 rwTx.delete(car1Path);
582 followerTestKit.doCommit(writeTxReady);
584 followerTestKit.doCommit(rwTx.ready());
588 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
592 public void testTransactionChainWithMultipleShards() throws Exception {
593 initDatastoresWithCarsAndPeople("testTransactionChainWithMultipleShards");
595 final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
597 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
598 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
600 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
601 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
603 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
604 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
606 followerTestKit.doCommit(writeTx.ready());
608 final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
610 final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
611 final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
612 readWriteTx.write(carPath, car);
614 final MapEntryNode person = PeopleModel.newPersonEntry("jack");
615 final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
616 readWriteTx.merge(personPath, person);
618 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
619 assertTrue("isPresent", optional.isPresent());
620 assertEquals("Data node", car, optional.get());
622 optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
623 assertTrue("isPresent", optional.isPresent());
624 assertEquals("Data node", person, optional.get());
626 final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
628 writeTx = txChain.newWriteOnlyTransaction();
630 writeTx.delete(personPath);
632 final DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
634 followerTestKit.doCommit(cohort2);
635 followerTestKit.doCommit(cohort3);
639 final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
640 verifyCars(readTx, car);
642 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
643 assertFalse("isPresent", optional.isPresent());
647 public void testChainedTransactionFailureWithSingleShard() throws Exception {
648 initDatastoresWithCars("testChainedTransactionFailureWithSingleShard");
650 final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
651 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
652 LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
653 MoreExecutors.directExecutor());
655 final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
656 final DOMTransactionChain txChain = broker.createTransactionChain(listener);
658 final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
660 final ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
661 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
662 .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
664 writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
667 writeTx.commit().get(5, TimeUnit.SECONDS);
668 fail("Expected TransactionCommitFailedException");
669 } catch (final ExecutionException e) {
673 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
680 public void testChainedTransactionFailureWithMultipleShards() throws Exception {
681 initDatastoresWithCarsAndPeople("testChainedTransactionFailureWithMultipleShards");
683 final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
684 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
685 LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
686 MoreExecutors.directExecutor());
688 final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
689 final DOMTransactionChain txChain = broker.createTransactionChain(listener);
691 final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
693 writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
695 final ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
696 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
697 .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
699 // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
700 // done for put for performance reasons.
701 writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
704 writeTx.commit().get(5, TimeUnit.SECONDS);
705 fail("Expected TransactionCommitFailedException");
706 } catch (final ExecutionException e) {
710 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
717 public void testSingleShardTransactionsWithLeaderChanges() throws Exception {
718 followerDatastoreContextBuilder.backendAlivenessTimerIntervalInSeconds(2);
719 final String testName = "testSingleShardTransactionsWithLeaderChanges";
720 initDatastoresWithCars(testName);
722 final String followerCarShardName = "member-2-shard-cars-" + testName;
723 InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 1, ApplyJournalEntries.class);
725 // Write top-level car container from the follower so it uses a remote Tx.
727 DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
729 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
730 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
732 followerTestKit.doCommit(writeTx.ready());
734 InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
736 // Switch the leader to the follower
738 sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
739 .shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
741 TestKit.shutdownActorSystem(leaderSystem, true);
742 Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
744 followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorUtils(), CARS);
746 leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
747 Cluster.get(leaderSystem).join(MEMBER_2_ADDRESS);
749 final DatastoreContext.Builder newMember1Builder = DatastoreContext.newBuilder()
750 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
751 IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder, commitTimeout);
753 try (AbstractDataStore ds =
754 newMember1TestKit.setupAbstractDataStore(
755 testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS)) {
757 followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), CARS);
759 // Write a car entry to the new leader - should switch to local Tx
761 writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
763 MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
764 YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
765 writeTx.merge(car1Path, car1);
767 followerTestKit.doCommit(writeTx.ready());
769 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1);
773 @SuppressWarnings("unchecked")
775 public void testReadyLocalTransactionForwardedToLeader() throws Exception {
776 initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader");
777 followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars");
779 final Optional<ActorRef> carsFollowerShard =
780 followerDistributedDataStore.getActorUtils().findLocalShard("cars");
781 assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
783 final DataTree dataTree = new InMemoryDataTreeFactory().create(
784 DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full());
786 // Send a tx with immediate commit.
788 DataTreeModification modification = dataTree.takeSnapshot().newModification();
789 new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
790 new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
792 final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
793 new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
794 modification.ready();
796 ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true, Optional.empty());
798 carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
799 Object resp = followerTestKit.expectMsgClass(Object.class);
800 if (resp instanceof akka.actor.Status.Failure) {
801 throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
804 assertEquals("Response type", CommitTransactionReply.class, resp.getClass());
806 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1);
808 // Send another tx without immediate commit.
810 modification = dataTree.takeSnapshot().newModification();
811 MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
812 new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
813 modification.ready();
815 readyLocal = new ReadyLocalTransaction(tx2 , modification, false, Optional.empty());
817 carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
818 resp = followerTestKit.expectMsgClass(Object.class);
819 if (resp instanceof akka.actor.Status.Failure) {
820 throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
823 assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
825 final ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection(
826 ((ReadyTransactionReply)resp).getCohortPath());
828 final Supplier<Short> versionSupplier = Mockito.mock(Supplier.class);
829 Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
830 ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
831 leaderDistributedDataStore.getActorUtils(), Arrays.asList(
832 new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2);
833 cohort.canCommit().get(5, TimeUnit.SECONDS);
834 cohort.preCommit().get(5, TimeUnit.SECONDS);
835 cohort.commit().get(5, TimeUnit.SECONDS);
837 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
840 @SuppressWarnings("unchecked")
842 public void testForwardedReadyTransactionForwardedToLeader() throws Exception {
843 initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader");
844 followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars");
846 final Optional<ActorRef> carsFollowerShard =
847 followerDistributedDataStore.getActorUtils().findLocalShard("cars");
848 assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
850 carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef());
851 final DataTree dataTree = followerTestKit.expectMsgClass(DataTree.class);
853 // Send a tx with immediate commit.
855 DataTreeModification modification = dataTree.takeSnapshot().newModification();
856 new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
857 new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
859 final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
860 new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
862 ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1,
863 DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
864 Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true,
867 carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
868 Object resp = followerTestKit.expectMsgClass(Object.class);
869 if (resp instanceof akka.actor.Status.Failure) {
870 throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
873 assertEquals("Response type", CommitTransactionReply.class, resp.getClass());
875 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1);
877 // Send another tx without immediate commit.
879 modification = dataTree.takeSnapshot().newModification();
880 MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
881 new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
883 forwardedReady = new ForwardedReadyTransaction(tx2,
884 DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
885 Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false,
888 carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
889 resp = followerTestKit.expectMsgClass(Object.class);
890 if (resp instanceof akka.actor.Status.Failure) {
891 throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
894 assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
896 ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection(
897 ((ReadyTransactionReply)resp).getCohortPath());
899 final Supplier<Short> versionSupplier = Mockito.mock(Supplier.class);
900 Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
901 final ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
902 leaderDistributedDataStore.getActorUtils(), Arrays.asList(
903 new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2);
904 cohort.canCommit().get(5, TimeUnit.SECONDS);
905 cohort.preCommit().get(5, TimeUnit.SECONDS);
906 cohort.commit().get(5, TimeUnit.SECONDS);
908 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
912 public void testTransactionForwardedToLeaderAfterRetry() throws Exception {
913 // FIXME: remove when test passes also for ClientBackedDataStore
914 Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
915 followerDatastoreContextBuilder.shardBatchedModificationCount(2);
916 leaderDatastoreContextBuilder.shardBatchedModificationCount(2);
917 initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry");
919 // Do an initial write to get the primary shard info cached.
921 final DOMStoreWriteTransaction initialWriteTx = followerDistributedDataStore.newWriteOnlyTransaction();
922 initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
923 initialWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
924 followerTestKit.doCommit(initialWriteTx.ready());
926 // Wait for the commit to be replicated to the follower.
928 MemberNode.verifyRaftState(followerDistributedDataStore, "cars",
929 raftState -> assertEquals("getLastApplied", 1, raftState.getLastApplied()));
931 MemberNode.verifyRaftState(followerDistributedDataStore, "people",
932 raftState -> assertEquals("getLastApplied", 1, raftState.getLastApplied()));
934 // Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in
937 final DOMStoreWriteTransaction writeTx1 = followerDistributedDataStore.newWriteOnlyTransaction();
938 writeTx1.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
939 writeTx1.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
940 final DOMStoreThreePhaseCommitCohort writeTx1Cohort = writeTx1.ready();
941 final ListenableFuture<Boolean> writeTx1CanCommit = writeTx1Cohort.canCommit();
942 writeTx1CanCommit.get(5, TimeUnit.SECONDS);
944 // Prepare and ready another WO tx that writes to 2 shards but don't canCommit yet. This will be queued
945 // in the leader shard.
947 final DOMStoreWriteTransaction writeTx2 = followerDistributedDataStore.newWriteOnlyTransaction();
948 final LinkedList<MapEntryNode> cars = new LinkedList<>();
950 cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
951 writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
953 NormalizedNode<?, ?> people = ImmutableNodes.mapNodeBuilder(PeopleModel.PERSON_QNAME)
954 .withChild(PeopleModel.newPersonEntry("Dude")).build();
955 writeTx2.write(PeopleModel.PERSON_LIST_PATH, people);
956 final DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready();
958 // Prepare another WO that writes to a single shard and thus will be directly committed on ready. This
959 // tx writes 5 cars so 2 BatchedModidifications messages will be sent initially and cached in the
960 // leader shard (with shardBatchedModificationCount set to 2). The 3rd BatchedModidifications will be
963 final DOMStoreWriteTransaction writeTx3 = followerDistributedDataStore.newWriteOnlyTransaction();
964 for (int i = 1; i <= 5; i++, carIndex++) {
965 cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
966 writeTx3.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
969 // Prepare another WO that writes to a single shard. This will send a single BatchedModidifications
972 final DOMStoreWriteTransaction writeTx4 = followerDistributedDataStore.newWriteOnlyTransaction();
973 cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
974 writeTx4.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
977 // Prepare a RW tx that will create a tx actor and send a ForwardedReadyTransaciton message to the
978 // leader shard on ready.
980 final DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction();
981 cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
982 readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
984 IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
985 stats -> assertEquals("getReadWriteTransactionCount", 5, stats.getReadWriteTransactionCount()));
987 // Disable elections on the leader so it switches to follower.
989 sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder
990 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
991 .shardElectionTimeoutFactor(10));
993 leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorUtils(), "cars");
995 // Submit all tx's - the messages should get queued for retry.
997 final ListenableFuture<Boolean> writeTx2CanCommit = writeTx2Cohort.canCommit();
998 final DOMStoreThreePhaseCommitCohort writeTx3Cohort = writeTx3.ready();
999 final DOMStoreThreePhaseCommitCohort writeTx4Cohort = writeTx4.ready();
1000 final DOMStoreThreePhaseCommitCohort rwTxCohort = readWriteTx.ready();
1002 // Enable elections on the other follower so it becomes the leader, at which point the
1003 // tx's should get forwarded from the previous leader to the new leader to complete the commits.
1005 sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
1006 .customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1));
1007 IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorUtils(), "cars")
1008 .tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1009 IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorUtils(), "people")
1010 .tell(TimeoutNow.INSTANCE, ActorRef.noSender());
1012 followerTestKit.doCommit(writeTx1CanCommit, writeTx1Cohort);
1013 followerTestKit.doCommit(writeTx2CanCommit, writeTx2Cohort);
1014 followerTestKit.doCommit(writeTx3Cohort);
1015 followerTestKit.doCommit(writeTx4Cohort);
1016 followerTestKit.doCommit(rwTxCohort);
1018 DOMStoreReadTransaction readTx = leaderDistributedDataStore.newReadOnlyTransaction();
1019 verifyCars(readTx, cars.toArray(new MapEntryNode[cars.size()]));
1020 verifyNode(readTx, PeopleModel.PERSON_LIST_PATH, people);
1024 public void testLeadershipTransferOnShutdown() throws Exception {
1025 // FIXME: remove when test passes also for ClientBackedDataStore
1026 Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
1027 leaderDatastoreContextBuilder.shardBatchedModificationCount(1);
1028 followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null);
1029 final String testName = "testLeadershipTransferOnShutdown";
1030 initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE);
1032 final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System,
1033 DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500),
1035 try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore(
1036 testParameter, testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) {
1038 followerTestKit.waitForMembersUp("member-3");
1039 follower2TestKit.waitForMembersUp("member-1", "member-2");
1041 // Create and submit a couple tx's so they're pending.
1043 DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
1044 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1045 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
1046 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
1047 final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
1049 IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
1050 stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize()));
1052 writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
1053 final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
1054 writeTx.write(CarsModel.newCarPath("optima"), car);
1055 final DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready();
1057 IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
1058 stats -> assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize()));
1060 // Gracefully stop the leader via a Shutdown message.
1062 sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder
1063 .shardElectionTimeoutFactor(100));
1065 final FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
1066 final Future<ActorRef> future = leaderDistributedDataStore.getActorUtils().findLocalShardAsync("cars");
1067 final ActorRef leaderActor = Await.result(future, duration);
1069 final Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE);
1071 // Commit the 2 transactions. They should finish and succeed.
1073 followerTestKit.doCommit(cohort1);
1074 followerTestKit.doCommit(cohort2);
1076 // Wait for the leader actor stopped.
1078 final Boolean stopped = Await.result(stopFuture, duration);
1079 assertEquals("Stopped", Boolean.TRUE, stopped);
1081 // Verify leadership was transferred by reading the committed data from the other nodes.
1083 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car);
1084 verifyCars(follower2DistributedDataStore.newReadOnlyTransaction(), car);
1089 public void testTransactionWithIsolatedLeader() throws Exception {
1090 // FIXME: remove when test passes also for ClientBackedDataStore
1091 Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
1092 // Set the isolated leader check interval high so we can control the switch to IsolatedLeader.
1093 leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000);
1094 final String testName = "testTransactionWithIsolatedLeader";
1095 initDatastoresWithCars(testName);
1097 // Tx that is submitted after the follower is stopped but before the leader transitions to IsolatedLeader.
1098 final DOMStoreWriteTransaction preIsolatedLeaderWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
1099 preIsolatedLeaderWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1101 // Tx that is submitted after the leader transitions to IsolatedLeader.
1102 final DOMStoreWriteTransaction noShardLeaderWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
1103 noShardLeaderWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1105 // Tx that is submitted after the follower is reinstated.
1106 final DOMStoreWriteTransaction successWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
1107 successWriteTx.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1109 // Stop the follower
1110 followerTestKit.watch(followerDistributedDataStore.getActorUtils().getShardManager());
1111 followerDistributedDataStore.close();
1112 followerTestKit.expectTerminated(followerDistributedDataStore.getActorUtils().getShardManager());
1114 // Submit the preIsolatedLeaderWriteTx so it's pending
1115 final DOMStoreThreePhaseCommitCohort preIsolatedLeaderTxCohort = preIsolatedLeaderWriteTx.ready();
1117 // Change the isolated leader check interval low so it changes to IsolatedLeader.
1118 sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder
1119 .shardIsolatedLeaderCheckIntervalInMillis(200));
1121 MemberNode.verifyRaftState(leaderDistributedDataStore, "cars",
1122 raftState -> assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState()));
1125 leaderTestKit.doCommit(noShardLeaderWriteTx.ready());
1126 fail("Expected NoShardLeaderException");
1127 } catch (final ExecutionException e) {
1128 assertEquals("getCause", NoShardLeaderException.class, Throwables.getRootCause(e).getClass());
1131 sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder
1132 .shardElectionTimeoutFactor(100));
1134 final DOMStoreThreePhaseCommitCohort successTxCohort = successWriteTx.ready();
1136 followerDistributedDataStore = followerTestKit.setupAbstractDataStore(
1137 testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS);
1139 leaderTestKit.doCommit(preIsolatedLeaderTxCohort);
1140 leaderTestKit.doCommit(successTxCohort);
1144 public void testTransactionWithShardLeaderNotResponding() throws Exception {
1145 followerDatastoreContextBuilder.frontendRequestTimeoutInSeconds(2);
1146 followerDatastoreContextBuilder.shardElectionTimeoutFactor(50);
1147 initDatastoresWithCars("testTransactionWithShardLeaderNotResponding");
1149 // Do an initial read to get the primary shard info cached.
1151 final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
1152 readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1154 // Shutdown the leader and try to create a new tx.
1156 TestKit.shutdownActorSystem(leaderSystem, true);
1158 followerDatastoreContextBuilder.operationTimeoutInMillis(50).shardElectionTimeoutFactor(1);
1159 sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
1161 final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
1163 rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1166 followerTestKit.doCommit(rwTx.ready());
1167 fail("Exception expected");
1168 } catch (final ExecutionException e) {
1169 final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
1170 if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
1171 assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException
1172 || e.getCause() instanceof ShardLeaderNotRespondingException);
1174 assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
1180 public void testTransactionWithCreateTxFailureDueToNoLeader() throws Exception {
1181 followerDatastoreContextBuilder.frontendRequestTimeoutInSeconds(2);
1182 initDatastoresWithCars("testTransactionWithCreateTxFailureDueToNoLeader");
1184 // Do an initial read to get the primary shard info cached.
1186 final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
1187 readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1189 // Shutdown the leader and try to create a new tx.
1191 TestKit.shutdownActorSystem(leaderSystem, true);
1193 Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
1195 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
1197 sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
1198 .operationTimeoutInMillis(10).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
1200 final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
1202 rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1205 followerTestKit.doCommit(rwTx.ready());
1206 fail("Exception expected");
1207 } catch (final ExecutionException e) {
1208 final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
1209 if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
1210 assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException);
1212 assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
1218 public void testTransactionRetryWithInitialAskTimeoutExOnCreateTx() throws Exception {
1219 followerDatastoreContextBuilder.backendAlivenessTimerIntervalInSeconds(2);
1220 String testName = "testTransactionRetryWithInitialAskTimeoutExOnCreateTx";
1221 initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS);
1223 final DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder()
1224 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10);
1225 final IntegrationTestKit follower2TestKit = new IntegrationTestKit(
1226 follower2System, follower2DatastoreContextBuilder, commitTimeout);
1228 try (AbstractDataStore ds =
1229 follower2TestKit.setupAbstractDataStore(
1230 testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS)) {
1232 followerTestKit.waitForMembersUp("member-1", "member-3");
1233 follower2TestKit.waitForMembersUp("member-1", "member-2");
1235 // Do an initial read to get the primary shard info cached.
1237 final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
1238 readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1240 // Shutdown the leader and try to create a new tx.
1242 TestKit.shutdownActorSystem(leaderSystem, true);
1244 Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
1246 sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
1247 .operationTimeoutInMillis(500).shardElectionTimeoutFactor(5).customRaftPolicyImplementation(null));
1249 final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
1251 rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
1253 followerTestKit.doCommit(rwTx.ready());
1258 public void testSemiReachableCandidateNotDroppingLeader() throws Exception {
1259 final String testName = "testSemiReachableCandidateNotDroppingLeader";
1260 initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS);
1262 final DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder()
1263 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10);
1264 final IntegrationTestKit follower2TestKit = new IntegrationTestKit(
1265 follower2System, follower2DatastoreContextBuilder, commitTimeout);
1267 final AbstractDataStore ds2 =
1268 follower2TestKit.setupAbstractDataStore(
1269 testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS);
1271 followerTestKit.waitForMembersUp("member-1", "member-3");
1272 follower2TestKit.waitForMembersUp("member-1", "member-2");
1274 TestKit.shutdownActorSystem(follower2System);
1276 ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").get();
1277 OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
1278 .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
1280 Cluster leaderCluster = Cluster.get(leaderSystem);
1281 Cluster followerCluster = Cluster.get(followerSystem);
1282 Cluster follower2Cluster = Cluster.get(follower2System);
1284 Member follower2Member = follower2Cluster.readView().self();
1286 await().atMost(10, TimeUnit.SECONDS)
1287 .until(() -> leaderCluster.readView().unreachableMembers().contains(follower2Member));
1288 await().atMost(10, TimeUnit.SECONDS)
1289 .until(() -> followerCluster.readView().unreachableMembers().contains(follower2Member));
1291 ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").get();
1293 // to simulate a follower not being able to receive messages, but still being able to send messages and becoming
1294 // candidate, we can just send a couple of RequestVotes to both leader and follower.
1295 cars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null);
1296 followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null);
1297 cars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null);
1298 followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null);
1300 OnDemandRaftState stateAfter = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
1301 .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
1302 OnDemandRaftState followerState = (OnDemandRaftState) followerDistributedDataStore.getActorUtils()
1303 .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
1305 assertEquals(initialState.getCurrentTerm(), stateAfter.getCurrentTerm());
1306 assertEquals(initialState.getCurrentTerm(), followerState.getCurrentTerm());
1312 public void testInstallSnapshot() throws Exception {
1313 final String testName = "testInstallSnapshot";
1314 final String leaderCarShardName = "member-1-shard-cars-" + testName;
1315 final String followerCarShardName = "member-2-shard-cars-" + testName;
1317 // Setup a saved snapshot on the leader. The follower will startup with no data and the leader should
1318 // install a snapshot to sync the follower.
1320 DataTree tree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_CONFIGURATION,
1321 SchemaContextHelper.full());
1323 final ContainerNode carsNode = CarsModel.newCarsNode(
1324 CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000))));
1325 AbstractShardTest.writeToStore(tree, CarsModel.BASE_PATH, carsNode);
1327 final NormalizedNode<?, ?> snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.empty());
1328 final Snapshot initialSnapshot = Snapshot.create(
1329 new ShardSnapshotState(new MetadataShardDataTreeSnapshot(snapshotRoot)),
1330 Collections.emptyList(), 5, 1, 5, 1, 1, null, null);
1331 InMemorySnapshotStore.addSnapshot(leaderCarShardName, initialSnapshot);
1333 InMemorySnapshotStore.addSnapshotSavedLatch(leaderCarShardName);
1334 InMemorySnapshotStore.addSnapshotSavedLatch(followerCarShardName);
1336 initDatastoresWithCars(testName);
1338 final Optional<NormalizedNode<?, ?>> readOptional = leaderDistributedDataStore.newReadOnlyTransaction().read(
1339 CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
1340 assertTrue("isPresent", readOptional.isPresent());
1341 assertEquals("Node", carsNode, readOptional.get());
1343 verifySnapshot(InMemorySnapshotStore.waitForSavedSnapshot(leaderCarShardName, Snapshot.class),
1344 initialSnapshot, snapshotRoot);
1346 verifySnapshot(InMemorySnapshotStore.waitForSavedSnapshot(followerCarShardName, Snapshot.class),
1347 initialSnapshot, snapshotRoot);
1351 public void testReadWriteMessageSlicing() throws Exception {
1352 // The slicing is only implemented for tell-based protocol
1353 Assume.assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter));
1355 leaderDatastoreContextBuilder.maximumMessageSliceSize(100);
1356 followerDatastoreContextBuilder.maximumMessageSliceSize(100);
1357 initDatastoresWithCars("testLargeReadReplySlicing");
1359 final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
1361 final NormalizedNode<?, ?> carsNode = CarsModel.create();
1362 rwTx.write(CarsModel.BASE_PATH, carsNode);
1364 verifyNode(rwTx, CarsModel.BASE_PATH, carsNode);
1367 private static void verifySnapshot(final Snapshot actual, final Snapshot expected,
1368 final NormalizedNode<?, ?> expRoot) {
1369 assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());
1370 assertEquals("Snapshot getLastAppliedIndex", expected.getLastAppliedIndex(), actual.getLastAppliedIndex());
1371 assertEquals("Snapshot getLastTerm", expected.getLastTerm(), actual.getLastTerm());
1372 assertEquals("Snapshot getLastIndex", expected.getLastIndex(), actual.getLastIndex());
1373 assertEquals("Snapshot state type", ShardSnapshotState.class, actual.getState().getClass());
1374 MetadataShardDataTreeSnapshot shardSnapshot =
1375 (MetadataShardDataTreeSnapshot) ((ShardSnapshotState)actual.getState()).getSnapshot();
1376 assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().get());
1379 private static void sendDatastoreContextUpdate(final AbstractDataStore dataStore, final Builder builder) {
1380 final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build());
1381 final DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
1382 final Answer<DatastoreContext> answer = invocation -> newBuilder.build();
1383 Mockito.doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext();
1384 Mockito.doAnswer(answer).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
1385 dataStore.onDatastoreContextUpdated(mockContextFactory);