package org.opendaylight.controller.cluster.datastore;
import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.ConfigFactory;
-import java.math.BigInteger;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.junit.After;
import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
+import org.opendaylight.controller.cluster.datastore.TestShard.StartDropMessages;
+import org.opendaylight.controller.cluster.datastore.TestShard.StopDropMessages;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.common.Uint64;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
- final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+ final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
final YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
writeTx.merge(car1Path, car1);
- final MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
+ final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000));
final YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
writeTx.merge(car2Path, car2);
for (int i = 0; i < numCars; i++) {
writeTx = txChain.newWriteOnlyTransaction();
writeTx.write(CarsModel.newCarPath("car" + i),
- CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
+ CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000)));
followerTestKit.doCommit(writeTx.ready());
rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
rwTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
- final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+ final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
rwTx.merge(CarsModel.newCarPath("optima"), car1);
verifyCars(rwTx, car1);
- final MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
+ final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000));
final YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
rwTx.merge(car2Path, car2);
rwTx.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
- final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+ final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
final YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
rwTx.write(car1Path, car1);
verifyCars(rwTx, car1);
- final MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
+ final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000));
rwTx.merge(CarsModel.newCarPath("sportage"), car2);
rwTx.delete(car1Path);
final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
- final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+ final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
readWriteTx.write(carPath, car);
writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
- MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+ MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
writeTx.merge(car1Path, car1);
new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
- final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+ final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
modification.ready();
// Send another tx without immediate commit.
modification = dataTree.takeSnapshot().newModification();
- MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
+ MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(30000));
new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
modification.ready();
new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
- final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+ final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1,
// Send another tx without immediate commit.
modification = dataTree.takeSnapshot().newModification();
- MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000));
+ MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(30000));
new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
forwardedReady = new ForwardedReadyTransaction(tx2,
final DOMStoreWriteTransaction writeTx2 = followerDistributedDataStore.newWriteOnlyTransaction();
final LinkedList<MapEntryNode> cars = new LinkedList<>();
int carIndex = 1;
- cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
+ cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
carIndex++;
NormalizedNode<?, ?> people = ImmutableNodes.mapNodeBuilder(PeopleModel.PERSON_QNAME)
final DOMStoreWriteTransaction writeTx3 = followerDistributedDataStore.newWriteOnlyTransaction();
for (int i = 1; i <= 5; i++, carIndex++) {
- cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
+ cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
writeTx3.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
}
// message on ready.
final DOMStoreWriteTransaction writeTx4 = followerDistributedDataStore.newWriteOnlyTransaction();
- cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
+ cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
writeTx4.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
carIndex++;
// leader shard on ready.
final DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction();
- cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
+ cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize()));
writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
- final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+ final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
writeTx.write(CarsModel.newCarPath("optima"), car);
final DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready();
SchemaContextHelper.full());
final ContainerNode carsNode = CarsModel.newCarsNode(
- CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000))));
+ CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", Uint64.valueOf(20000))));
AbstractShardTest.writeToStore(tree, CarsModel.BASE_PATH, carsNode);
final NormalizedNode<?, ?> snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.empty());
verifyNode(rwTx, CarsModel.BASE_PATH, carsNode);
}
+ @SuppressWarnings("IllegalCatch")
+ @Test
+ public void testRaftCallbackDuringLeadershipDrop() throws Exception {
+ final String testName = "testRaftCallbackDuringLeadershipDrop";
+ initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS);
+
+ final ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System,
+ DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500)
+ .shardLeaderElectionTimeoutInSeconds(3600),
+ commitTimeout);
+
+ final DOMStoreWriteTransaction initialWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
+ initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ leaderTestKit.doCommit(initialWriteTx.ready());
+
+ try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore(
+ testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false)) {
+
+ final ActorRef member3Cars = ((LocalShardStore) follower2DistributedDataStore).getLocalShards()
+ .getLocalShards().get("cars").getActor();
+ final ActorRef member2Cars = ((LocalShardStore)followerDistributedDataStore).getLocalShards()
+ .getLocalShards().get("cars").getActor();
+ member2Cars.tell(new StartDropMessages(AppendEntries.class), null);
+ member3Cars.tell(new StartDropMessages(AppendEntries.class), null);
+
+ final DOMStoreWriteTransaction newTx = leaderDistributedDataStore.newWriteOnlyTransaction();
+ newTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ final AtomicBoolean submitDone = new AtomicBoolean(false);
+ executor.submit(() -> {
+ try {
+ leaderTestKit.doCommit(newTx.ready());
+ submitDone.set(true);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ final ActorRef leaderCars = ((LocalShardStore) leaderDistributedDataStore).getLocalShards()
+ .getLocalShards().get("cars").getActor();
+ await().atMost(10, TimeUnit.SECONDS)
+ .until(() -> ((OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
+ .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE)).getLastIndex() >= 1);
+
+ final OnDemandRaftState raftState = (OnDemandRaftState)leaderDistributedDataStore.getActorUtils()
+ .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE);
+
+ // Simulate a follower not receiving heartbeats but still being able to send messages ie RequestVote with
+ // new term(switching to candidate after election timeout)
+ leaderCars.tell(new RequestVote(raftState.getCurrentTerm() + 1,
+ "member-3-shard-cars-testRaftCallbackDuringLeadershipDrop", -1,
+ -1), member3Cars);
+
+ member2Cars.tell(new StopDropMessages(AppendEntries.class), null);
+ member3Cars.tell(new StopDropMessages(AppendEntries.class), null);
+
+ await("Is tx stuck in COMMIT_PENDING")
+ .atMost(10, TimeUnit.SECONDS).untilAtomic(submitDone, equalTo(true));
+
+ }
+
+ executor.shutdownNow();
+ }
+
private static void verifySnapshot(final Snapshot actual, final Snapshot expected,
final NormalizedNode<?, ?> expRoot) {
assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());