X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreRemotingIntegrationTest.java;h=e403bbc20a2ccc5837121d6e502eaa72d2b53c2d;hb=f9ee2cce797cf12402dd55c406f3e270d7d2e20d;hp=95149a1809864545e6a163bf66450d7349b7ee11;hpb=45371f6048ab259f7ed536962bd081d1eb5ae2ef;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 95149a1809..e403bbc20a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -24,6 +24,7 @@ import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.AddressFromURIString; import akka.cluster.Cluster; +import akka.cluster.Member; import akka.dispatch.Futures; import akka.pattern.Patterns; import akka.testkit.javadsl.TestKit; @@ -36,7 +37,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; -import java.math.BigInteger; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -81,7 +81,10 @@ import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardData import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; +import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; +import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; @@ -101,6 +104,7 @@ import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction; import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain; import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.common.Uint64; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; @@ -265,11 +269,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); final YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima"); writeTx.merge(car1Path, car1); - final MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000)); + final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000)); final YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage"); writeTx.merge(car2Path, car2); @@ -354,7 +358,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { for (int i = 0; i < numCars; i++) { writeTx = txChain.newWriteOnlyTransaction(); writeTx.write(CarsModel.newCarPath("car" + i), - CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000))); + CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000))); followerTestKit.doCommit(writeTx.ready()); @@ -473,12 +477,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); rwTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); rwTx.merge(CarsModel.newCarPath("optima"), car1); verifyCars(rwTx, car1); - final MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000)); + final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000)); final YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage"); rwTx.merge(car2Path, car2); @@ -562,7 +566,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { rwTx.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); final YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima"); rwTx.write(car1Path, car1); @@ -570,7 +574,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { verifyCars(rwTx, car1); - final MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000)); + final MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(25000)); rwTx.merge(CarsModel.newCarPath("sportage"), car2); rwTx.delete(car1Path); @@ -603,7 +607,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction(); - final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); readWriteTx.write(carPath, car); @@ -756,7 +760,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); - MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima"); writeTx.merge(car1Path, car1); @@ -785,7 +789,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification); new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification); - final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification); modification.ready(); @@ -804,7 +808,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // Send another tx without immediate commit. modification = dataTree.takeSnapshot().newModification(); - MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000)); + MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(30000)); new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification); modification.ready(); @@ -852,7 +856,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification); new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification); - final MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification); ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1, @@ -873,7 +877,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // Send another tx without immediate commit. modification = dataTree.takeSnapshot().newModification(); - MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000)); + MapEntryNode car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(30000)); new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification); forwardedReady = new ForwardedReadyTransaction(tx2, @@ -943,7 +947,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final DOMStoreWriteTransaction writeTx2 = followerDistributedDataStore.newWriteOnlyTransaction(); final LinkedList cars = new LinkedList<>(); int carIndex = 1; - cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); + cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex))); writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); carIndex++; NormalizedNode people = ImmutableNodes.mapNodeBuilder(PeopleModel.PERSON_QNAME) @@ -958,7 +962,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final DOMStoreWriteTransaction writeTx3 = followerDistributedDataStore.newWriteOnlyTransaction(); for (int i = 1; i <= 5; i++, carIndex++) { - cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); + cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex))); writeTx3.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); } @@ -966,7 +970,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // message on ready. final DOMStoreWriteTransaction writeTx4 = followerDistributedDataStore.newWriteOnlyTransaction(); - cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); + cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex))); writeTx4.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); carIndex++; @@ -974,7 +978,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // leader shard on ready. final DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction(); - cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); + cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex))); readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", @@ -1046,7 +1050,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize())); writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); - final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000)); writeTx.write(CarsModel.newCarPath("optima"), car); final DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready(); @@ -1250,6 +1254,60 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } } + @Test + public void testSemiReachableCandidateNotDroppingLeader() throws Exception { + final String testName = "testSemiReachableCandidateNotDroppingLeader"; + initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS); + + final DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder() + .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10); + final IntegrationTestKit follower2TestKit = new IntegrationTestKit( + follower2System, follower2DatastoreContextBuilder, commitTimeout); + + final AbstractDataStore ds2 = + follower2TestKit.setupAbstractDataStore( + testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS); + + followerTestKit.waitForMembersUp("member-1", "member-3"); + follower2TestKit.waitForMembersUp("member-1", "member-2"); + + TestKit.shutdownActorSystem(follower2System); + + ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").get(); + OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils() + .executeOperation(cars, GetOnDemandRaftState.INSTANCE); + + Cluster leaderCluster = Cluster.get(leaderSystem); + Cluster followerCluster = Cluster.get(followerSystem); + Cluster follower2Cluster = Cluster.get(follower2System); + + Member follower2Member = follower2Cluster.readView().self(); + + await().atMost(10, TimeUnit.SECONDS) + .until(() -> leaderCluster.readView().unreachableMembers().contains(follower2Member)); + await().atMost(10, TimeUnit.SECONDS) + .until(() -> followerCluster.readView().unreachableMembers().contains(follower2Member)); + + ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").get(); + + // to simulate a follower not being able to receive messages, but still being able to send messages and becoming + // candidate, we can just send a couple of RequestVotes to both leader and follower. + cars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null); + followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 1, "member-3-shard-cars", -1, -1), null); + cars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null); + followerCars.tell(new RequestVote(initialState.getCurrentTerm() + 3, "member-3-shard-cars", -1, -1), null); + + OnDemandRaftState stateAfter = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils() + .executeOperation(cars, GetOnDemandRaftState.INSTANCE); + OnDemandRaftState followerState = (OnDemandRaftState) followerDistributedDataStore.getActorUtils() + .executeOperation(cars, GetOnDemandRaftState.INSTANCE); + + assertEquals(initialState.getCurrentTerm(), stateAfter.getCurrentTerm()); + assertEquals(initialState.getCurrentTerm(), followerState.getCurrentTerm()); + + ds2.close(); + } + @Test public void testInstallSnapshot() throws Exception { final String testName = "testInstallSnapshot"; @@ -1263,10 +1321,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { SchemaContextHelper.full()); final ContainerNode carsNode = CarsModel.newCarsNode( - CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)))); + CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", Uint64.valueOf(20000)))); AbstractShardTest.writeToStore(tree, CarsModel.BASE_PATH, carsNode); - final NormalizedNode snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.EMPTY); + final NormalizedNode snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.empty()); final Snapshot initialSnapshot = Snapshot.create( new ShardSnapshotState(new MetadataShardDataTreeSnapshot(snapshotRoot)), Collections.emptyList(), 5, 1, 5, 1, 1, null, null);