From: Tomas Cere Date: Thu, 30 Mar 2017 12:44:32 +0000 (+0200) Subject: Bug 7805: Add make-leader-local rpc for module based shard. X-Git-Tag: release/carbon~102 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=f4785825f5fb572e31fc1e656ce4134f3aa38293;hp=be338c9e1dab83e2a5ff21819b92b934ef32faee Bug 7805: Add make-leader-local rpc for module based shard. csit testing scenarios require movement of the shard leader for module based shards aswell so add this into ClusterAdminRpcService. Change-Id: Ib8a310cdba728c0a42d8850703740bf4698adbe0 Signed-off-by: Tomas Cere --- diff --git a/opendaylight/md-sal/sal-cluster-admin-api/src/main/yang/cluster-admin.yang b/opendaylight/md-sal/sal-cluster-admin-api/src/main/yang/cluster-admin.yang index 8a3c58a162..079b256840 100644 --- a/opendaylight/md-sal/sal-cluster-admin-api/src/main/yang/cluster-admin.yang +++ b/opendaylight/md-sal/sal-cluster-admin-api/src/main/yang/cluster-admin.yang @@ -109,6 +109,29 @@ module cluster-admin { described in the Raft paper."; } + rpc make-leader-local { + input { + leaf shard-name { + mandatory true; + type string; + description "The name of the shard for which to move the leader to the local node"; + } + + leaf data-store-type { + mandatory true; + type data-store-type; + description "The type of the data store to which the shard belongs"; + } + } + + description "Attempts to move the shard leader of the given module based shard to the local node. + The rpc returns a response after handling of the underlying MakeLeaderLocal message completes. + This operation fails if there is no current shard leader due to lack of network connectivity or + a cluster majority. In addition, if the local node is not up to date with the current leader, + an attempt is made to first sync the local node with the leader. If this cannot be achieved + within two election timeout periods the operation fails."; + } + rpc add-prefix-shard-replica { input { leaf shard-prefix { diff --git a/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java b/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java index 3627bd80fa..ab85331c14 100644 --- a/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java +++ b/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java @@ -37,6 +37,7 @@ import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardRepl import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus; +import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal; import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; @@ -57,6 +58,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutputBuilder; @@ -86,6 +88,7 @@ public class ClusterAdminRpcService implements ClusterAdminService { private final DistributedDataStoreInterface configDataStore; private final DistributedDataStoreInterface operDataStore; private final BindingNormalizedNodeSerializer serializer; + private final Timeout makeLeaderLocalTimeout; public ClusterAdminRpcService(DistributedDataStoreInterface configDataStore, DistributedDataStoreInterface operDataStore, @@ -93,6 +96,10 @@ public class ClusterAdminRpcService implements ClusterAdminService { this.configDataStore = configDataStore; this.operDataStore = operDataStore; this.serializer = serializer; + + this.makeLeaderLocalTimeout = + new Timeout(configDataStore.getActorContext().getDatastoreContext() + .getShardLeaderElectionTimeout().duration().$times(2)); } @Override @@ -167,6 +174,62 @@ public class ClusterAdminRpcService implements ClusterAdminService { return returnFuture; } + @Override + public Future> makeLeaderLocal(final MakeLeaderLocalInput input) { + final String shardName = input.getShardName(); + if (Strings.isNullOrEmpty(shardName)) { + return newFailedRpcResultFuture("A valid shard name must be specified"); + } + + DataStoreType dataStoreType = input.getDataStoreType(); + if (dataStoreType == null) { + return newFailedRpcResultFuture("A valid DataStoreType must be specified"); + } + + LOG.info("Moving leader to local node for shard {}, datastoreType {}", shardName, dataStoreType); + + ActorContext actorContext = dataStoreType == DataStoreType.Config + ? configDataStore.getActorContext() + : operDataStore.getActorContext(); + + final scala.concurrent.Future localShardReply = + actorContext.findLocalShardAsync(shardName); + + final scala.concurrent.Promise makeLeaderLocalAsk = akka.dispatch.Futures.promise(); + localShardReply.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final ActorRef actorRef) throws Throwable { + if (failure != null) { + LOG.warn("No local shard found for {} datastoreType {} - Cannot request leadership transfer to" + + " local shard.", shardName, failure); + makeLeaderLocalAsk.failure(failure); + } else { + makeLeaderLocalAsk + .completeWith(actorContext + .executeOperationAsync(actorRef, MakeLeaderLocal.INSTANCE, makeLeaderLocalTimeout)); + } + } + }, actorContext.getClientDispatcher()); + + final SettableFuture> future = SettableFuture.create(); + makeLeaderLocalAsk.future().onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final Object success) throws Throwable { + if (failure != null) { + LOG.error("Leadership transfer failed for shard {}.", shardName, failure); + future.set(RpcResultBuilder.failed().withError(ErrorType.APPLICATION, + "leadership transfer failed", failure).build()); + return; + } + + LOG.debug("Leadership transfer complete {}.", success); + future.set(RpcResultBuilder.success().build()); + } + }, actorContext.getClientDispatcher()); + + return future; + } + @Override public Future> addPrefixShardReplica(final AddPrefixShardReplicaInput input) { diff --git a/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java b/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java index 5cca07428f..90c6cd1b58 100644 --- a/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java +++ b/opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java @@ -88,6 +88,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput; @@ -232,6 +233,40 @@ public class ClusterAdminRpcServiceTest { verifyNoShardPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH)); } + @Test + public void testModuleShardLeaderMovement() throws Exception { + String name = "testModuleShardLeaderMovement"; + String moduleShardsConfig = "module-shards-member1.conf"; + + final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name) + .waitForShardLeader("cars").moduleShardsConfig(moduleShardsConfig).build(); + final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name) + .moduleShardsConfig(moduleShardsConfig).build(); + final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name) + .moduleShardsConfig(moduleShardsConfig).build(); + + member1.waitForMembersUp("member-2", "member-3"); + + doAddShardReplica(replicaNode2, "cars", "member-1"); + doAddShardReplica(replicaNode3, "cars", "member-1", "member-2"); + + verifyRaftPeersPresent(member1.configDataStore(), "cars", "member-2", "member-3"); + + verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3"); + + verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2"); + + doMakeShardLeaderLocal(member1, "cars", "member-1"); + replicaNode2.kit().waitUntilLeader(replicaNode2.configDataStore().getActorContext(), "cars"); + replicaNode3.kit().waitUntilLeader(replicaNode3.configDataStore().getActorContext(), "cars"); + + doMakeShardLeaderLocal(replicaNode2, "cars", "member-2"); + member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(), "cars"); + replicaNode3.kit().waitUntilLeader(replicaNode3.configDataStore().getActorContext(), "cars"); + + doMakeShardLeaderLocal(replicaNode3, "cars", "member-3"); + } + @Test public void testAddShardReplica() throws Exception { String name = "testAddShardReplica"; @@ -403,6 +438,22 @@ public class ClusterAdminRpcServiceTest { verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames); } + private static void doMakeShardLeaderLocal(final MemberNode memberNode, String shardName, String newLeader) + throws Exception { + ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(), + memberNode.operDataStore(), null); + + final RpcResult rpcResult = service.makeLeaderLocal(new MakeLeaderLocalInputBuilder() + .setDataStoreType(DataStoreType.Config).setShardName(shardName).build()) + .get(10, TimeUnit.SECONDS); + + verifySuccessfulRpcResult(rpcResult); + + verifyRaftState(memberNode.configDataStore(), shardName, raftState -> assertThat(raftState.getLeader(), + containsString(newLeader))); + + } + private static T verifySuccessfulRpcResult(RpcResult rpcResult) { if (!rpcResult.isSuccessful()) { if (rpcResult.getErrors().size() > 0) {