Bug 7805: Add make-leader-local rpc for module based shard. 00/54100/13
authorTomas Cere <tcere@cisco.com>
Thu, 30 Mar 2017 12:44:32 +0000 (14:44 +0200)
committerTom Pantelis <tompantelis@gmail.com>
Thu, 6 Apr 2017 16:38:39 +0000 (16:38 +0000)
csit testing scenarios require movement of the shard leader for module
based shards aswell so add this into ClusterAdminRpcService.

Change-Id: Ib8a310cdba728c0a42d8850703740bf4698adbe0
Signed-off-by: Tomas Cere <tcere@cisco.com>
opendaylight/md-sal/sal-cluster-admin-api/src/main/yang/cluster-admin.yang
opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java
opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java

index 8a3c58a16279f25d7e4a8d2375ea52c898ef4e40..079b256840ad89afa2d74eb102e6bb37e7545623 100644 (file)
@@ -109,6 +109,29 @@ module cluster-admin {
             described in the Raft paper.";
     }
 
+    rpc make-leader-local {
+        input {
+            leaf shard-name {
+                mandatory true;
+                type string;
+                description "The name of the shard for which to move the leader to the local node";
+            }
+
+            leaf data-store-type {
+                mandatory true;
+                type data-store-type;
+                description "The type of the data store to which the shard belongs";
+            }
+        }
+
+        description "Attempts to move the shard leader of the given module based shard to the local node.
+                The rpc returns a response after handling of the underlying MakeLeaderLocal message completes.
+                This operation fails if there is no current shard leader due to lack of network connectivity or
+                a cluster majority. In addition, if the local node is not up to date with the current leader,
+                an attempt is made to first sync the local node with the leader. If this cannot be achieved
+                within two election timeout periods the operation fails.";
+    }
+
     rpc add-prefix-shard-replica {
         input {
             leaf shard-prefix {
index 3627bd80facea719dddddcd61065218efff1d2d7..ab85331c14d57a052a5c84e92713b89a8adaffb1 100644 (file)
@@ -37,6 +37,7 @@ import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardRepl
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
+import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
 import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
@@ -57,6 +58,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutputBuilder;
@@ -86,6 +88,7 @@ public class ClusterAdminRpcService implements ClusterAdminService {
     private final DistributedDataStoreInterface configDataStore;
     private final DistributedDataStoreInterface operDataStore;
     private final BindingNormalizedNodeSerializer serializer;
+    private final Timeout makeLeaderLocalTimeout;
 
     public ClusterAdminRpcService(DistributedDataStoreInterface configDataStore,
             DistributedDataStoreInterface operDataStore,
@@ -93,6 +96,10 @@ public class ClusterAdminRpcService implements ClusterAdminService {
         this.configDataStore = configDataStore;
         this.operDataStore = operDataStore;
         this.serializer = serializer;
+
+        this.makeLeaderLocalTimeout =
+                new Timeout(configDataStore.getActorContext().getDatastoreContext()
+                        .getShardLeaderElectionTimeout().duration().$times(2));
     }
 
     @Override
@@ -167,6 +174,62 @@ public class ClusterAdminRpcService implements ClusterAdminService {
         return returnFuture;
     }
 
+    @Override
+    public Future<RpcResult<Void>> makeLeaderLocal(final MakeLeaderLocalInput input) {
+        final String shardName = input.getShardName();
+        if (Strings.isNullOrEmpty(shardName)) {
+            return newFailedRpcResultFuture("A valid shard name must be specified");
+        }
+
+        DataStoreType dataStoreType = input.getDataStoreType();
+        if (dataStoreType == null) {
+            return newFailedRpcResultFuture("A valid DataStoreType must be specified");
+        }
+
+        LOG.info("Moving leader to local node for shard {}, datastoreType {}", shardName, dataStoreType);
+
+        ActorContext actorContext = dataStoreType == DataStoreType.Config
+                ? configDataStore.getActorContext()
+                : operDataStore.getActorContext();
+
+        final scala.concurrent.Future<ActorRef> localShardReply =
+                actorContext.findLocalShardAsync(shardName);
+
+        final scala.concurrent.Promise<Object> makeLeaderLocalAsk = akka.dispatch.Futures.promise();
+        localShardReply.onComplete(new OnComplete<ActorRef>() {
+            @Override
+            public void onComplete(final Throwable failure, final ActorRef actorRef) throws Throwable {
+                if (failure != null) {
+                    LOG.warn("No local shard found for {} datastoreType {} - Cannot request leadership transfer to"
+                                    + " local shard.", shardName, failure);
+                    makeLeaderLocalAsk.failure(failure);
+                } else {
+                    makeLeaderLocalAsk
+                            .completeWith(actorContext
+                                    .executeOperationAsync(actorRef, MakeLeaderLocal.INSTANCE, makeLeaderLocalTimeout));
+                }
+            }
+        }, actorContext.getClientDispatcher());
+
+        final SettableFuture<RpcResult<Void>> future = SettableFuture.create();
+        makeLeaderLocalAsk.future().onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object success) throws Throwable {
+                if (failure != null) {
+                    LOG.error("Leadership transfer failed for shard {}.", shardName, failure);
+                    future.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION,
+                            "leadership transfer failed", failure).build());
+                    return;
+                }
+
+                LOG.debug("Leadership transfer complete {}.", success);
+                future.set(RpcResultBuilder.<Void>success().build());
+            }
+        }, actorContext.getClientDispatcher());
+
+        return future;
+    }
+
     @Override
     public Future<RpcResult<Void>> addPrefixShardReplica(final AddPrefixShardReplicaInput input) {
 
index 5cca07428f2e61ddf2b6700bb32e55a7abfb1b67..90c6cd1b58aec2b77f51df7db50736b72e9f6d99 100644 (file)
@@ -88,6 +88,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput;
@@ -232,6 +233,40 @@ public class ClusterAdminRpcServiceTest {
         verifyNoShardPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
     }
 
+    @Test
+    public void testModuleShardLeaderMovement() throws Exception {
+        String name = "testModuleShardLeaderMovement";
+        String moduleShardsConfig = "module-shards-member1.conf";
+
+        final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
+                .waitForShardLeader("cars").moduleShardsConfig(moduleShardsConfig).build();
+        final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
+                .moduleShardsConfig(moduleShardsConfig).build();
+        final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
+                .moduleShardsConfig(moduleShardsConfig).build();
+
+        member1.waitForMembersUp("member-2", "member-3");
+
+        doAddShardReplica(replicaNode2, "cars", "member-1");
+        doAddShardReplica(replicaNode3, "cars", "member-1", "member-2");
+
+        verifyRaftPeersPresent(member1.configDataStore(), "cars", "member-2", "member-3");
+
+        verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
+
+        verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
+
+        doMakeShardLeaderLocal(member1, "cars", "member-1");
+        replicaNode2.kit().waitUntilLeader(replicaNode2.configDataStore().getActorContext(), "cars");
+        replicaNode3.kit().waitUntilLeader(replicaNode3.configDataStore().getActorContext(), "cars");
+
+        doMakeShardLeaderLocal(replicaNode2, "cars", "member-2");
+        member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(), "cars");
+        replicaNode3.kit().waitUntilLeader(replicaNode3.configDataStore().getActorContext(), "cars");
+
+        doMakeShardLeaderLocal(replicaNode3, "cars", "member-3");
+    }
+
     @Test
     public void testAddShardReplica() throws Exception {
         String name = "testAddShardReplica";
@@ -403,6 +438,22 @@ public class ClusterAdminRpcServiceTest {
         verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
     }
 
+    private static void doMakeShardLeaderLocal(final MemberNode memberNode, String shardName, String newLeader)
+            throws Exception {
+        ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
+                memberNode.operDataStore(), null);
+
+        final RpcResult<Void> rpcResult = service.makeLeaderLocal(new MakeLeaderLocalInputBuilder()
+                .setDataStoreType(DataStoreType.Config).setShardName(shardName).build())
+                .get(10, TimeUnit.SECONDS);
+
+        verifySuccessfulRpcResult(rpcResult);
+
+        verifyRaftState(memberNode.configDataStore(), shardName, raftState -> assertThat(raftState.getLeader(),
+                containsString(newLeader)));
+
+    }
+
     private static <T> T verifySuccessfulRpcResult(RpcResult<T> rpcResult) {
         if (!rpcResult.isSuccessful()) {
             if (rpcResult.getErrors().size() > 0) {