X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-cluster-admin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fadmin%2FClusterAdminRpcService.java;h=1bda653fc2ed63814b5706fd0404734bcbfa8bff;hp=ab85331c14d57a052a5c84e92713b89a8adaffb1;hb=refs%2Fchanges%2F75%2F64275%2F1;hpb=f4785825f5fb572e31fc1e656ce4134f3aa38293 diff --git a/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java b/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java index ab85331c14..1bda653fc2 100644 --- a/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java +++ b/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java @@ -12,12 +12,14 @@ import akka.actor.Status.Success; import akka.dispatch.OnComplete; import akka.pattern.Patterns; import akka.util.Timeout; + import com.google.common.base.Function; import com.google.common.base.Strings; import com.google.common.base.Throwables; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import java.io.FileOutputStream; import java.io.IOException; @@ -37,12 +39,15 @@ import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardRepl import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus; +import org.opendaylight.controller.cluster.datastore.messages.GetShardRole; +import org.opendaylight.controller.cluster.datastore.messages.GetShardRoleReply; import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal; import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshotList; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput; @@ -58,6 +63,12 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput; @@ -130,7 +141,7 @@ public class ClusterAdminRpcService implements ClusterAdminService { onMessageFailure(String.format("Failed to add replica for shard %s", shardName), returnFuture, failure); } - }); + }, MoreExecutors.directExecutor()); return returnFuture; } @@ -169,7 +180,7 @@ public class ClusterAdminRpcService implements ClusterAdminService { onMessageFailure(String.format("Failed to remove replica for shard %s", shardName), returnFuture, failure); } - }); + }, MoreExecutors.directExecutor()); return returnFuture; } @@ -186,12 +197,13 @@ public class ClusterAdminRpcService implements ClusterAdminService { return newFailedRpcResultFuture("A valid DataStoreType must be specified"); } - LOG.info("Moving leader to local node for shard {}, datastoreType {}", shardName, dataStoreType); - ActorContext actorContext = dataStoreType == DataStoreType.Config ? configDataStore.getActorContext() : operDataStore.getActorContext(); + LOG.info("Moving leader to local node {} for shard {}, datastoreType {}", + actorContext.getCurrentMemberName().getName(), shardName, dataStoreType); + final scala.concurrent.Future localShardReply = actorContext.findLocalShardAsync(shardName); @@ -222,7 +234,7 @@ public class ClusterAdminRpcService implements ClusterAdminService { return; } - LOG.debug("Leadership transfer complete {}.", success); + LOG.debug("Leadership transfer complete"); future.set(RpcResultBuilder.success().build()); } }, actorContext.getClientDispatcher()); @@ -260,7 +272,7 @@ public class ClusterAdminRpcService implements ClusterAdminService { onMessageFailure(String.format("Failed to add replica for shard %s", prefix), returnFuture, failure); } - }); + }, MoreExecutors.directExecutor()); return returnFuture; } @@ -302,7 +314,7 @@ public class ClusterAdminRpcService implements ClusterAdminService { onMessageFailure(String.format("Failed to remove replica for shard %s", prefix), returnFuture, failure); } - }); + }, MoreExecutors.directExecutor()); return returnFuture; } @@ -312,7 +324,7 @@ public class ClusterAdminRpcService implements ClusterAdminService { LOG.info("Adding replicas for all shards"); final List, ShardResultBuilder>> shardResultData = new ArrayList<>(); - Function messageSupplier = shardName -> new AddShardReplica(shardName); + Function messageSupplier = AddShardReplica::new; sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier); sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier); @@ -381,7 +393,7 @@ public class ClusterAdminRpcService implements ClusterAdminService { onMessageFailure(String.format("Failed to change member voting states for shard %s", shardName), returnFuture, failure); } - }); + }, MoreExecutors.directExecutor()); return returnFuture; } @@ -411,8 +423,7 @@ public class ClusterAdminRpcService implements ClusterAdminService { @Override public Future> flipMemberVotingStatesForAllShards() { final List, ShardResultBuilder>> shardResultData = new ArrayList<>(); - Function messageSupplier = shardName -> - new FlipShardMembersVotingStatus(shardName); + Function messageSupplier = FlipShardMembersVotingStatus::new; LOG.info("Flip member voting states for all shards"); @@ -424,6 +435,95 @@ public class ClusterAdminRpcService implements ClusterAdminService { "Failed to change member voting states"); } + @Override + public Future> getShardRole(final GetShardRoleInput input) { + final String shardName = input.getShardName(); + if (Strings.isNullOrEmpty(shardName)) { + return newFailedRpcResultFuture("A valid shard name must be specified"); + } + + DataStoreType dataStoreType = input.getDataStoreType(); + if (dataStoreType == null) { + return newFailedRpcResultFuture("A valid DataStoreType must be specified"); + } + + LOG.info("Getting role for shard {}, datastore type {}", shardName, dataStoreType); + + final SettableFuture> returnFuture = SettableFuture.create(); + ListenableFuture future = sendMessageToShardManager(dataStoreType, + new GetShardRole(shardName)); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(final GetShardRoleReply reply) { + if (reply == null) { + returnFuture.set(ClusterAdminRpcService.newFailedRpcResultBuilder( + "No Shard role present. Please retry..").build()); + return; + } + LOG.info("Successfully received role:{} for shard {}", reply.getRole(), shardName); + final GetShardRoleOutputBuilder builder = new GetShardRoleOutputBuilder(); + if (reply.getRole() != null) { + builder.setRole(reply.getRole()); + } + returnFuture.set(newSuccessfulResult(builder.build())); + } + + @Override + public void onFailure(final Throwable failure) { + returnFuture.set(ClusterAdminRpcService.newFailedRpcResultBuilder( + "Failed to get shard role.", failure).build()); + } + }, MoreExecutors.directExecutor()); + + return returnFuture; + } + + @Override + public Future> getPrefixShardRole(final GetPrefixShardRoleInput input) { + final InstanceIdentifier identifier = input.getShardPrefix(); + if (identifier == null) { + return newFailedRpcResultFuture("A valid shard identifier must be specified"); + } + + final DataStoreType dataStoreType = input.getDataStoreType(); + if (dataStoreType == null) { + return newFailedRpcResultFuture("A valid DataStoreType must be specified"); + } + + LOG.info("Getting prefix shard role for shard: {}, datastore type {}", identifier, dataStoreType); + + final YangInstanceIdentifier prefix = serializer.toYangInstanceIdentifier(identifier); + final String shardName = ClusterUtils.getCleanShardName(prefix); + final SettableFuture> returnFuture = SettableFuture.create(); + ListenableFuture future = sendMessageToShardManager(dataStoreType, + new GetShardRole(shardName)); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(final GetShardRoleReply reply) { + if (reply == null) { + returnFuture.set(ClusterAdminRpcService.newFailedRpcResultBuilder( + "No Shard role present. Please retry..").build()); + return; + } + + LOG.info("Successfully received role:{} for shard {}", reply.getRole(), shardName); + final GetPrefixShardRoleOutputBuilder builder = new GetPrefixShardRoleOutputBuilder(); + if (reply.getRole() != null) { + builder.setRole(reply.getRole()); + } + returnFuture.set(newSuccessfulResult(builder.build())); + } + + @Override + public void onFailure(final Throwable failure) { + returnFuture.set(ClusterAdminRpcService.newFailedRpcResultBuilder( + "Failed to get shard role.", failure).build()); + } + }, MoreExecutors.directExecutor()); + + return returnFuture; + } + @Override public Future> backupDatastore(final BackupDatastoreInput input) { LOG.debug("backupDatastore: {}", input); @@ -444,7 +544,7 @@ public class ClusterAdminRpcService implements ClusterAdminService { public void onFailure(Throwable failure) { onDatastoreBackupFailure(input.getFilePath(), returnFuture, failure); } - }); + }, MoreExecutors.directExecutor()); return returnFuture; } @@ -455,10 +555,7 @@ public class ClusterAdminRpcService implements ClusterAdminService { for (MemberVotingState memberStatus: memberVotingStatus) { serverVotingStatusMap.put(memberStatus.getMemberName(), memberStatus.isVoting()); } - - ChangeShardMembersVotingStatus changeVotingStatus = new ChangeShardMembersVotingStatus(shardName, - serverVotingStatusMap); - return changeVotingStatus; + return new ChangeShardMembersVotingStatus(shardName, serverVotingStatusMap); } private static SettableFuture> waitForShardResults( @@ -498,7 +595,7 @@ public class ClusterAdminRpcService implements ClusterAdminService { returnFuture.set(newSuccessfulResult(resultDataSupplier.apply(shardResults))); } } - }); + }, MoreExecutors.directExecutor()); } return returnFuture; } @@ -513,8 +610,8 @@ public class ClusterAdminRpcService implements ClusterAdminService { LOG.debug("Sending message to all shards {} for data store {}", allShardNames, actorContext.getDataStoreName()); for (String shardName: allShardNames) { - ListenableFuture future = this.ask(actorContext.getShardManager(), messageSupplier.apply(shardName), - SHARD_MGR_TIMEOUT); + ListenableFuture future = this.ask(actorContext.getShardManager(), messageSupplier.apply(shardName), + SHARD_MGR_TIMEOUT); shardResultData.add(new SimpleEntry<>(future, new ShardResultBuilder().setShardName(shardName).setDataStoreType(dataStoreType))); } @@ -593,10 +690,10 @@ public class ClusterAdminRpcService implements ClusterAdminService { } private static RpcResult newSuccessfulResult() { - return newSuccessfulResult((Void)null); + return newSuccessfulResult(null); } private static RpcResult newSuccessfulResult(T data) { - return RpcResultBuilder.success(data).build(); + return RpcResultBuilder.success(data).build(); } }