package org.opendaylight.controller.cluster.datastore;
import static org.awaitility.Awaitility.await;
-import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.junit.After;
import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
-import org.opendaylight.controller.cluster.datastore.TestShard.StartDropMessages;
-import org.opendaylight.controller.cluster.datastore.TestShard.StopDropMessages;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
verifyNode(rwTx, CarsModel.BASE_PATH, carsNode);
}
- @SuppressWarnings("IllegalCatch")
- @Test
- public void testRaftCallbackDuringLeadershipDrop() throws Exception {
- final String testName = "testRaftCallbackDuringLeadershipDrop";
- initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS);
-
- final ExecutorService executor = Executors.newSingleThreadExecutor();
-
- final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System,
- DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500)
- .shardLeaderElectionTimeoutInSeconds(3600),
- commitTimeout);
-
- final DOMStoreWriteTransaction initialWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
- initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
- leaderTestKit.doCommit(initialWriteTx.ready());
-
- try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore(
- testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false)) {
-
- final ActorRef member3Cars = ((LocalShardStore) follower2DistributedDataStore).getLocalShards()
- .getLocalShards().get("cars").getActor();
- final ActorRef member2Cars = ((LocalShardStore)followerDistributedDataStore).getLocalShards()
- .getLocalShards().get("cars").getActor();
- member2Cars.tell(new StartDropMessages(AppendEntries.class), null);
- member3Cars.tell(new StartDropMessages(AppendEntries.class), null);
-
- final DOMStoreWriteTransaction newTx = leaderDistributedDataStore.newWriteOnlyTransaction();
- newTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
- final AtomicBoolean submitDone = new AtomicBoolean(false);
- executor.submit(() -> {
- try {
- leaderTestKit.doCommit(newTx.ready());
- submitDone.set(true);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
- final ActorRef leaderCars = ((LocalShardStore) leaderDistributedDataStore).getLocalShards()
- .getLocalShards().get("cars").getActor();
- await().atMost(10, TimeUnit.SECONDS)
- .until(() -> ((OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
- .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE)).getLastIndex() >= 1);
-
- final OnDemandRaftState raftState = (OnDemandRaftState)leaderDistributedDataStore.getActorUtils()
- .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE);
-
- // Simulate a follower not receiving heartbeats but still being able to send messages ie RequestVote with
- // new term(switching to candidate after election timeout)
- leaderCars.tell(new RequestVote(raftState.getCurrentTerm() + 1,
- "member-3-shard-cars-testRaftCallbackDuringLeadershipDrop", -1,
- -1), member3Cars);
-
- member2Cars.tell(new StopDropMessages(AppendEntries.class), null);
- member3Cars.tell(new StopDropMessages(AppendEntries.class), null);
-
- await("Is tx stuck in COMMIT_PENDING")
- .atMost(10, TimeUnit.SECONDS).untilAtomic(submitDone, equalTo(true));
-
- }
-
- executor.shutdownNow();
- }
-
private static void verifySnapshot(final Snapshot actual, final Snapshot expected,
final NormalizedNode<?, ?> expRoot) {
assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());