*/
package org.opendaylight.controller.cluster.datastore;
+import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import akka.actor.Address;
import akka.actor.AddressFromURIString;
import akka.cluster.Cluster;
+import akka.cluster.Member;
import akka.dispatch.Futures;
import akka.pattern.Patterns;
import akka.testkit.javadsl.TestKit;
import com.google.common.base.Stopwatch;
-import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
+import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
+import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.common.api.TransactionChainListener;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
+import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
import org.opendaylight.mdsal.dom.spi.store.DOMStore;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
@Parameters(name = "{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
- { DistributedDataStore.class, 7}, { ClientBackedDataStore.class, 12 }
+ { TestDistributedDataStore.class, 7}, { TestClientBackedDataStore.class, 12 }
});
}
followerDistributedDataStore = followerTestKit.setupAbstractDataStore(
testParameter, type, moduleShardsConfig, false, shards);
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), shards);
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorUtils(), shards);
leaderTestKit.waitForMembersUp("member-2");
followerTestKit.waitForMembersUp("member-1");
}
}
+ @Test
+ public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
+ final String testName = "testWriteTransactionWithSingleShard";
+ initDatastoresWithCars(testName);
+
+ final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
+
+ DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+ writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ followerTestKit.doCommit(writeTx.ready());
+
+ int numCars = 5;
+ for (int i = 0; i < numCars; i++) {
+ writeTx = txChain.newWriteOnlyTransaction();
+ writeTx.write(CarsModel.newCarPath("car" + i),
+ CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
+
+ followerTestKit.doCommit(writeTx.ready());
+
+ DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
+ domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
+
+ domStoreReadTransaction.close();
+ }
+
+ // wait to let the shard catch up with purged
+ await("Range set leak test").atMost(5, TimeUnit.SECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ Optional<ActorRef> localShard =
+ leaderDistributedDataStore.getActorUtils().findLocalShard("cars");
+ FrontendShardDataTreeSnapshotMetadata frontendMetadata =
+ (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+ .executeOperation(localShard.get(), new RequestFrontendMetadata());
+
+ if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+ Iterator<FrontendHistoryMetadata> iterator =
+ frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
+ FrontendHistoryMetadata metadata = iterator.next();
+ while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+ metadata = iterator.next();
+ }
+
+ assertEquals(0, metadata.getClosedTransactions().size());
+ assertEquals(Range.closedOpen(UnsignedLong.valueOf(0), UnsignedLong.valueOf(11)),
+ metadata.getPurgedTransactions().asRanges().iterator().next());
+ } else {
+ // ask based should track no metadata
+ assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
+ }
+ });
+
+ final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
+ .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+ assertTrue("isPresent", optional.isPresent());
+ assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
+ }
+
+ @Test
+ @Ignore("Flushes out tell based leak needs to be handled separately")
+ public void testCloseTransactionMetadataLeak() throws Exception {
+ // Ask based frontend seems to have some issues with back to back close
+ Assume.assumeTrue(testParameter.isAssignableFrom(TestClientBackedDataStore.class));
+
+ final String testName = "testWriteTransactionWithSingleShard";
+ initDatastoresWithCars(testName);
+
+ final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
+
+ DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+ writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ followerTestKit.doCommit(writeTx.ready());
+
+ int numCars = 5;
+ for (int i = 0; i < numCars; i++) {
+ writeTx = txChain.newWriteOnlyTransaction();
+ writeTx.close();
+
+ DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
+ domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
+
+ domStoreReadTransaction.close();
+ }
+
+ writeTx = txChain.newWriteOnlyTransaction();
+ writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ followerTestKit.doCommit(writeTx.ready());
+
+ // wait to let the shard catch up with purged
+ await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ Optional<ActorRef> localShard =
+ leaderDistributedDataStore.getActorUtils().findLocalShard("cars");
+ FrontendShardDataTreeSnapshotMetadata frontendMetadata =
+ (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+ .executeOperation(localShard.get(), new RequestFrontendMetadata());
+
+ if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+ Iterator<FrontendHistoryMetadata> iterator =
+ frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
+ FrontendHistoryMetadata metadata = iterator.next();
+ while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+ metadata = iterator.next();
+ }
+
+ Set<Range<UnsignedLong>> ranges = metadata.getPurgedTransactions().asRanges();
+
+ assertEquals(0, metadata.getClosedTransactions().size());
+ assertEquals(1, ranges.size());
+ } else {
+ // ask based should track no metadata
+ assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
+ }
+ });
+
+ final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
+ .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+ assertTrue("isPresent", optional.isPresent());
+ assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
+ }
+
@Test
public void testReadWriteTransactionWithSingleShard() throws Exception {
initDatastoresWithCars("testReadWriteTransactionWithSingleShard");
LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
MoreExecutors.directExecutor());
- final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
+ final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
final DOMTransactionChain txChain = broker.createTransactionChain(listener);
final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
MoreExecutors.directExecutor());
- final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
+ final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
final DOMTransactionChain txChain = broker.createTransactionChain(listener);
final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
TestKit.shutdownActorSystem(leaderSystem, true);
Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
- followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), CARS);
+ followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorUtils(), CARS);
leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
Cluster.get(leaderSystem).join(MEMBER_2_ADDRESS);
newMember1TestKit.setupAbstractDataStore(
testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS)) {
- followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), CARS);
+ followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), CARS);
// Write a car entry to the new leader - should switch to local Tx
@Test
public void testReadyLocalTransactionForwardedToLeader() throws Exception {
initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader");
- followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars");
+ followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars");
- final com.google.common.base.Optional<ActorRef> carsFollowerShard =
- followerDistributedDataStore.getActorContext().findLocalShard("cars");
+ final Optional<ActorRef> carsFollowerShard =
+ followerDistributedDataStore.getActorUtils().findLocalShard("cars");
assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
final DataTree dataTree = new InMemoryDataTreeFactory().create(
new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
modification.ready();
- ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true,
- java.util.Optional.empty());
+ ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true, Optional.empty());
carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
Object resp = followerTestKit.expectMsgClass(Object.class);
new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
modification.ready();
- readyLocal = new ReadyLocalTransaction(tx2 , modification, false, java.util.Optional.empty());
+ readyLocal = new ReadyLocalTransaction(tx2 , modification, false, Optional.empty());
carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
resp = followerTestKit.expectMsgClass(Object.class);
assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
- final ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection(
+ final ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection(
((ReadyTransactionReply)resp).getCohortPath());
final Supplier<Short> versionSupplier = Mockito.mock(Supplier.class);
Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
- leaderDistributedDataStore.getActorContext(), Arrays.asList(
+ leaderDistributedDataStore.getActorUtils(), Arrays.asList(
new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2);
cohort.canCommit().get(5, TimeUnit.SECONDS);
cohort.preCommit().get(5, TimeUnit.SECONDS);
@Test
public void testForwardedReadyTransactionForwardedToLeader() throws Exception {
initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader");
- followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars");
+ followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars");
- final com.google.common.base.Optional<ActorRef> carsFollowerShard =
- followerDistributedDataStore.getActorContext().findLocalShard("cars");
+ final Optional<ActorRef> carsFollowerShard =
+ followerDistributedDataStore.getActorUtils().findLocalShard("cars");
assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef());
ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1,
DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true,
- java.util.Optional.empty());
+ Optional.empty());
carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
Object resp = followerTestKit.expectMsgClass(Object.class);
forwardedReady = new ForwardedReadyTransaction(tx2,
DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false,
- java.util.Optional.empty());
+ Optional.empty());
carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
resp = followerTestKit.expectMsgClass(Object.class);
assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
- ActorSelection txActor = leaderDistributedDataStore.getActorContext().actorSelection(
+ ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection(
((ReadyTransactionReply)resp).getCohortPath());
final Supplier<Short> versionSupplier = Mockito.mock(Supplier.class);
Mockito.doReturn(DataStoreVersions.CURRENT_VERSION).when(versionSupplier).get();
final ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
- leaderDistributedDataStore.getActorContext(), Arrays.asList(
+ leaderDistributedDataStore.getActorUtils(), Arrays.asList(
new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor), versionSupplier)), tx2);
cohort.canCommit().get(5, TimeUnit.SECONDS);
cohort.preCommit().get(5, TimeUnit.SECONDS);
@Test
public void testTransactionForwardedToLeaderAfterRetry() throws Exception {
- //TODO remove when test passes also for ClientBackedDataStore
- Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+ // FIXME: remove when test passes also for ClientBackedDataStore
+ Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
followerDatastoreContextBuilder.shardBatchedModificationCount(2);
leaderDatastoreContextBuilder.shardBatchedModificationCount(2);
initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry");
cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
carIndex++;
- NormalizedNode<?, ?> people = PeopleModel.newPersonMapNode();
+ NormalizedNode<?, ?> people = ImmutableNodes.mapNodeBuilder(PeopleModel.PERSON_QNAME)
+ .withChild(PeopleModel.newPersonEntry("Dude")).build();
writeTx2.write(PeopleModel.PERSON_LIST_PATH, people);
final DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready();
readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
- stats -> assertEquals("getReadWriteTransactionCount", 1, stats.getReadWriteTransactionCount()));
+ stats -> assertEquals("getReadWriteTransactionCount", 5, stats.getReadWriteTransactionCount()));
// Disable elections on the leader so it switches to follower.
.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
.shardElectionTimeoutFactor(10));
- leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars");
+ leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorUtils(), "cars");
// Submit all tx's - the messages should get queued for retry.
sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
.customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1));
- IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorContext(), "cars")
+ IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorUtils(), "cars")
.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
- IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorContext(), "people")
+ IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorUtils(), "people")
.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
followerTestKit.doCommit(writeTx1CanCommit, writeTx1Cohort);
@Test
public void testLeadershipTransferOnShutdown() throws Exception {
- //TODO remove when test passes also for ClientBackedDataStore
- Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+ // FIXME: remove when test passes also for ClientBackedDataStore
+ Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
leaderDatastoreContextBuilder.shardBatchedModificationCount(1);
followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null);
final String testName = "testLeadershipTransferOnShutdown";
initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE);
final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System,
- DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(100),
+ DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500),
commitTimeout);
try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore(
testParameter, testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) {
.shardElectionTimeoutFactor(100));
final FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
- final Future<ActorRef> future = leaderDistributedDataStore.getActorContext().findLocalShardAsync("cars");
+ final Future<ActorRef> future = leaderDistributedDataStore.getActorUtils().findLocalShardAsync("cars");
final ActorRef leaderActor = Await.result(future, duration);
final Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE);
@Test
public void testTransactionWithIsolatedLeader() throws Exception {
- //TODO remove when test passes also for ClientBackedDataStore
- Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+ // FIXME: remove when test passes also for ClientBackedDataStore
+ Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
// Set the isolated leader check interval high so we can control the switch to IsolatedLeader.
leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000);
final String testName = "testTransactionWithIsolatedLeader";
successWriteTx.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
// Stop the follower
- followerTestKit.watch(followerDistributedDataStore.getActorContext().getShardManager());
+ followerTestKit.watch(followerDistributedDataStore.getActorUtils().getShardManager());
followerDistributedDataStore.close();
- followerTestKit.expectTerminated(followerDistributedDataStore.getActorContext().getShardManager());
+ followerTestKit.expectTerminated(followerDistributedDataStore.getActorUtils().getShardManager());
// Submit the preIsolatedLeaderWriteTx so it's pending
final DOMStoreThreePhaseCommitCohort preIsolatedLeaderTxCohort = preIsolatedLeaderWriteTx.ready();
fail("Exception expected");
} catch (final ExecutionException e) {
final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
- if (DistributedDataStore.class.equals(testParameter)) {
+ if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException
|| e.getCause() instanceof ShardLeaderNotRespondingException);
} else {
fail("Exception expected");
} catch (final ExecutionException e) {
final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
- if (DistributedDataStore.class.equals(testParameter)) {
+ if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException);
} else {
assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
}
}
+ @Test
+ public void testSemiReachableCandidateNotDroppingLeader() throws Exception {
+ final String testName = "testSemiReachableCandidateNotDroppingLeader";
+ initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS);
+
+ final DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder()
+ .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10);
+ final IntegrationTestKit follower2TestKit = new IntegrationTestKit(
+ follower2System, follower2DatastoreContextBuilder, commitTimeout);
+
+ final AbstractDataStore ds2 =
+ follower2TestKit.setupAbstractDataStore(
+ testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS);
+
+ followerTestKit.waitForMembersUp("member-1", "member-3");
+ follower2TestKit.waitForMembersUp("member-1", "member-2");
+
+ TestKit.shutdownActorSystem(follower2System);
+
+ ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").get();
+ OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
+ .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
+
+ Cluster leaderCluster = Cluster.get(leaderSystem);
+ Cluster followerCluster = Cluster.get(followerSystem);
+ Cluster follower2Cluster = Cluster.get(follower2System);
+
+ Member follower2Member = follower2Cluster.readView().self();
+
+ await().atMost(10, TimeUnit.SECONDS)
+ .until(() -> leaderCluster.readView().unreachableMembers().contains(follower2Member));
+ await().atMost(10, TimeUnit.SECONDS)
+ .until(() -> followerCluster.readView().unreachableMembers().contains(follower2Member));
+
+ ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").get();
+
+ // to simulate a follower not being able to receive messages, but still being able to send messages and becoming
+ // candidate, we can just send a couple of RequestVotes to both leader and follower.
+ cars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null);
+ followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null);
+ cars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null);
+ followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null);
+
+ OnDemandRaftState stateAfter = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
+ .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
+ OnDemandRaftState followerState = (OnDemandRaftState) followerDistributedDataStore.getActorUtils()
+ .executeOperation(cars, GetOnDemandRaftState.INSTANCE);
+
+ assertEquals(initialState.getCurrentTerm(), stateAfter.getCurrentTerm());
+ assertEquals(initialState.getCurrentTerm(), followerState.getCurrentTerm());
+
+ ds2.close();
+ }
+
@Test
public void testInstallSnapshot() throws Exception {
final String testName = "testInstallSnapshot";
CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000))));
AbstractShardTest.writeToStore(tree, CarsModel.BASE_PATH, carsNode);
- final NormalizedNode<?, ?> snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.EMPTY);
+ final NormalizedNode<?, ?> snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.empty());
final Snapshot initialSnapshot = Snapshot.create(
new ShardSnapshotState(new MetadataShardDataTreeSnapshot(snapshotRoot)),
Collections.emptyList(), 5, 1, 5, 1, 1, null, null);
@Test
public void testReadWriteMessageSlicing() throws Exception {
// The slicing is only implemented for tell-based protocol
- Assume.assumeTrue(testParameter.equals(ClientBackedDataStore.class));
+ Assume.assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter));
leaderDatastoreContextBuilder.maximumMessageSliceSize(100);
followerDatastoreContextBuilder.maximumMessageSliceSize(100);