Bug 7805: Add make-leader-local rpc for module based shard.
[controller.git] / opendaylight / md-sal / sal-cluster-admin-impl / src / main / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcService.java
index 3667131a81262ba17e2d1273783c8827029e44fa..ab85331c14d57a052a5c84e92713b89a8adaffb1 100644 (file)
@@ -33,14 +33,19 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
+import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
-import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
-import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList;
 import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
+import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
+import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
+import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshotList;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInput;
@@ -53,16 +58,20 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingState;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,11 +87,19 @@ public class ClusterAdminRpcService implements ClusterAdminService {
 
     private final DistributedDataStoreInterface configDataStore;
     private final DistributedDataStoreInterface operDataStore;
+    private final BindingNormalizedNodeSerializer serializer;
+    private final Timeout makeLeaderLocalTimeout;
 
     public ClusterAdminRpcService(DistributedDataStoreInterface configDataStore,
-            DistributedDataStoreInterface operDataStore) {
+            DistributedDataStoreInterface operDataStore,
+            BindingNormalizedNodeSerializer serializer) {
         this.configDataStore = configDataStore;
         this.operDataStore = operDataStore;
+        this.serializer = serializer;
+
+        this.makeLeaderLocalTimeout =
+                new Timeout(configDataStore.getActorContext().getDatastoreContext()
+                        .getShardLeaderElectionTimeout().duration().$times(2));
     }
 
     @Override
@@ -157,6 +174,139 @@ public class ClusterAdminRpcService implements ClusterAdminService {
         return returnFuture;
     }
 
+    @Override
+    public Future<RpcResult<Void>> makeLeaderLocal(final MakeLeaderLocalInput input) {
+        final String shardName = input.getShardName();
+        if (Strings.isNullOrEmpty(shardName)) {
+            return newFailedRpcResultFuture("A valid shard name must be specified");
+        }
+
+        DataStoreType dataStoreType = input.getDataStoreType();
+        if (dataStoreType == null) {
+            return newFailedRpcResultFuture("A valid DataStoreType must be specified");
+        }
+
+        LOG.info("Moving leader to local node for shard {}, datastoreType {}", shardName, dataStoreType);
+
+        ActorContext actorContext = dataStoreType == DataStoreType.Config
+                ? configDataStore.getActorContext()
+                : operDataStore.getActorContext();
+
+        final scala.concurrent.Future<ActorRef> localShardReply =
+                actorContext.findLocalShardAsync(shardName);
+
+        final scala.concurrent.Promise<Object> makeLeaderLocalAsk = akka.dispatch.Futures.promise();
+        localShardReply.onComplete(new OnComplete<ActorRef>() {
+            @Override
+            public void onComplete(final Throwable failure, final ActorRef actorRef) throws Throwable {
+                if (failure != null) {
+                    LOG.warn("No local shard found for {} datastoreType {} - Cannot request leadership transfer to"
+                                    + " local shard.", shardName, failure);
+                    makeLeaderLocalAsk.failure(failure);
+                } else {
+                    makeLeaderLocalAsk
+                            .completeWith(actorContext
+                                    .executeOperationAsync(actorRef, MakeLeaderLocal.INSTANCE, makeLeaderLocalTimeout));
+                }
+            }
+        }, actorContext.getClientDispatcher());
+
+        final SettableFuture<RpcResult<Void>> future = SettableFuture.create();
+        makeLeaderLocalAsk.future().onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object success) throws Throwable {
+                if (failure != null) {
+                    LOG.error("Leadership transfer failed for shard {}.", shardName, failure);
+                    future.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION,
+                            "leadership transfer failed", failure).build());
+                    return;
+                }
+
+                LOG.debug("Leadership transfer complete {}.", success);
+                future.set(RpcResultBuilder.<Void>success().build());
+            }
+        }, actorContext.getClientDispatcher());
+
+        return future;
+    }
+
+    @Override
+    public Future<RpcResult<Void>> addPrefixShardReplica(final AddPrefixShardReplicaInput input) {
+
+        final InstanceIdentifier<?> identifier = input.getShardPrefix();
+        if (identifier == null) {
+            return newFailedRpcResultFuture("A valid shard identifier must be specified");
+        }
+
+        final DataStoreType dataStoreType = input.getDataStoreType();
+        if (dataStoreType == null) {
+            return newFailedRpcResultFuture("A valid DataStoreType must be specified");
+        }
+
+        LOG.info("Adding replica for shard {}, datastore type {}", identifier, dataStoreType);
+
+        final YangInstanceIdentifier prefix = serializer.toYangInstanceIdentifier(identifier);
+        final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
+        ListenableFuture<Success> future = sendMessageToShardManager(dataStoreType, new AddPrefixShardReplica(prefix));
+        Futures.addCallback(future, new FutureCallback<Success>() {
+            @Override
+            public void onSuccess(Success success) {
+                LOG.info("Successfully added replica for shard {}", prefix);
+                returnFuture.set(newSuccessfulResult());
+            }
+
+            @Override
+            public void onFailure(Throwable failure) {
+                onMessageFailure(String.format("Failed to add replica for shard %s", prefix),
+                        returnFuture, failure);
+            }
+        });
+
+        return returnFuture;
+    }
+
+    @Override
+    public Future<RpcResult<Void>> removePrefixShardReplica(final RemovePrefixShardReplicaInput input) {
+
+        final InstanceIdentifier<?> identifier = input.getShardPrefix();
+        if (identifier == null) {
+            return newFailedRpcResultFuture("A valid shard identifier must be specified");
+        }
+
+        final DataStoreType dataStoreType = input.getDataStoreType();
+        if (dataStoreType == null) {
+            return newFailedRpcResultFuture("A valid DataStoreType must be specified");
+        }
+
+        final String memberName = input.getMemberName();
+        if (Strings.isNullOrEmpty(memberName)) {
+            return newFailedRpcResultFuture("A valid member name must be specified");
+        }
+
+        LOG.info("Removing replica for shard {} memberName {}, datastoreType {}",
+                identifier, memberName, dataStoreType);
+        final YangInstanceIdentifier prefix = serializer.toYangInstanceIdentifier(identifier);
+
+        final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
+        final ListenableFuture<Success> future = sendMessageToShardManager(dataStoreType,
+                new RemovePrefixShardReplica(prefix, MemberName.forName(memberName)));
+        Futures.addCallback(future, new FutureCallback<Success>() {
+            @Override
+            public void onSuccess(final Success success) {
+                LOG.info("Successfully removed replica for shard {}", prefix);
+                returnFuture.set(newSuccessfulResult());
+            }
+
+            @Override
+            public void onFailure(final Throwable failure) {
+                onMessageFailure(String.format("Failed to remove replica for shard %s", prefix),
+                        returnFuture, failure);
+            }
+        });
+
+        return returnFuture;
+    }
+
     @Override
     public Future<RpcResult<AddReplicasForAllShardsOutput>> addReplicasForAllShards() {
         LOG.info("Adding replicas for all shards");