X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-cluster-admin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fadmin%2FClusterAdminRpcService.java;h=ab85331c14d57a052a5c84e92713b89a8adaffb1;hp=d51753c3eac0ac16225bed70909afb3c536c9e9d;hb=f4785825f5fb572e31fc1e656ce4134f3aa38293;hpb=2f77e92af7a68b4a97dbfb709c6cc9b11a49878a diff --git a/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java b/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java index d51753c3ea..ab85331c14 100644 --- a/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java +++ b/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java @@ -33,14 +33,19 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface; +import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus; +import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal; +import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshotList; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; +import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInput; @@ -53,16 +58,20 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingState; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,11 +87,19 @@ public class ClusterAdminRpcService implements ClusterAdminService { private final DistributedDataStoreInterface configDataStore; private final DistributedDataStoreInterface operDataStore; + private final BindingNormalizedNodeSerializer serializer; + private final Timeout makeLeaderLocalTimeout; public ClusterAdminRpcService(DistributedDataStoreInterface configDataStore, - DistributedDataStoreInterface operDataStore) { + DistributedDataStoreInterface operDataStore, + BindingNormalizedNodeSerializer serializer) { this.configDataStore = configDataStore; this.operDataStore = operDataStore; + this.serializer = serializer; + + this.makeLeaderLocalTimeout = + new Timeout(configDataStore.getActorContext().getDatastoreContext() + .getShardLeaderElectionTimeout().duration().$times(2)); } @Override @@ -157,6 +174,139 @@ public class ClusterAdminRpcService implements ClusterAdminService { return returnFuture; } + @Override + public Future> makeLeaderLocal(final MakeLeaderLocalInput input) { + final String shardName = input.getShardName(); + if (Strings.isNullOrEmpty(shardName)) { + return newFailedRpcResultFuture("A valid shard name must be specified"); + } + + DataStoreType dataStoreType = input.getDataStoreType(); + if (dataStoreType == null) { + return newFailedRpcResultFuture("A valid DataStoreType must be specified"); + } + + LOG.info("Moving leader to local node for shard {}, datastoreType {}", shardName, dataStoreType); + + ActorContext actorContext = dataStoreType == DataStoreType.Config + ? configDataStore.getActorContext() + : operDataStore.getActorContext(); + + final scala.concurrent.Future localShardReply = + actorContext.findLocalShardAsync(shardName); + + final scala.concurrent.Promise makeLeaderLocalAsk = akka.dispatch.Futures.promise(); + localShardReply.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final ActorRef actorRef) throws Throwable { + if (failure != null) { + LOG.warn("No local shard found for {} datastoreType {} - Cannot request leadership transfer to" + + " local shard.", shardName, failure); + makeLeaderLocalAsk.failure(failure); + } else { + makeLeaderLocalAsk + .completeWith(actorContext + .executeOperationAsync(actorRef, MakeLeaderLocal.INSTANCE, makeLeaderLocalTimeout)); + } + } + }, actorContext.getClientDispatcher()); + + final SettableFuture> future = SettableFuture.create(); + makeLeaderLocalAsk.future().onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final Object success) throws Throwable { + if (failure != null) { + LOG.error("Leadership transfer failed for shard {}.", shardName, failure); + future.set(RpcResultBuilder.failed().withError(ErrorType.APPLICATION, + "leadership transfer failed", failure).build()); + return; + } + + LOG.debug("Leadership transfer complete {}.", success); + future.set(RpcResultBuilder.success().build()); + } + }, actorContext.getClientDispatcher()); + + return future; + } + + @Override + public Future> addPrefixShardReplica(final AddPrefixShardReplicaInput input) { + + final InstanceIdentifier identifier = input.getShardPrefix(); + if (identifier == null) { + return newFailedRpcResultFuture("A valid shard identifier must be specified"); + } + + final DataStoreType dataStoreType = input.getDataStoreType(); + if (dataStoreType == null) { + return newFailedRpcResultFuture("A valid DataStoreType must be specified"); + } + + LOG.info("Adding replica for shard {}, datastore type {}", identifier, dataStoreType); + + final YangInstanceIdentifier prefix = serializer.toYangInstanceIdentifier(identifier); + final SettableFuture> returnFuture = SettableFuture.create(); + ListenableFuture future = sendMessageToShardManager(dataStoreType, new AddPrefixShardReplica(prefix)); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Success success) { + LOG.info("Successfully added replica for shard {}", prefix); + returnFuture.set(newSuccessfulResult()); + } + + @Override + public void onFailure(Throwable failure) { + onMessageFailure(String.format("Failed to add replica for shard %s", prefix), + returnFuture, failure); + } + }); + + return returnFuture; + } + + @Override + public Future> removePrefixShardReplica(final RemovePrefixShardReplicaInput input) { + + final InstanceIdentifier identifier = input.getShardPrefix(); + if (identifier == null) { + return newFailedRpcResultFuture("A valid shard identifier must be specified"); + } + + final DataStoreType dataStoreType = input.getDataStoreType(); + if (dataStoreType == null) { + return newFailedRpcResultFuture("A valid DataStoreType must be specified"); + } + + final String memberName = input.getMemberName(); + if (Strings.isNullOrEmpty(memberName)) { + return newFailedRpcResultFuture("A valid member name must be specified"); + } + + LOG.info("Removing replica for shard {} memberName {}, datastoreType {}", + identifier, memberName, dataStoreType); + final YangInstanceIdentifier prefix = serializer.toYangInstanceIdentifier(identifier); + + final SettableFuture> returnFuture = SettableFuture.create(); + final ListenableFuture future = sendMessageToShardManager(dataStoreType, + new RemovePrefixShardReplica(prefix, MemberName.forName(memberName))); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(final Success success) { + LOG.info("Successfully removed replica for shard {}", prefix); + returnFuture.set(newSuccessfulResult()); + } + + @Override + public void onFailure(final Throwable failure) { + onMessageFailure(String.format("Failed to remove replica for shard %s", prefix), + returnFuture, failure); + } + }); + + return returnFuture; + } + @Override public Future> addReplicasForAllShards() { LOG.info("Adding replicas for all shards");