Add optional timeout parameter for backup rpc
[controller.git] / opendaylight / md-sal / sal-cluster-admin-impl / src / main / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcService.java
index adc059c14e6e6fcf2be9bb1178ce9a953be6aa62..9ba40c5b1f04758b6a120a8766ad93a2e54205d4 100644 (file)
@@ -15,6 +15,8 @@ import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.base.Strings;
 import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -30,8 +32,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.SerializationUtils;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
@@ -40,6 +44,8 @@ import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardRepl
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
+import org.opendaylight.controller.cluster.datastore.messages.GetKnownClients;
+import org.opendaylight.controller.cluster.datastore.messages.GetKnownClientsReply;
 import org.opendaylight.controller.cluster.datastore.messages.GetShardRole;
 import org.opendaylight.controller.cluster.datastore.messages.GetShardRoleReply;
 import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
@@ -75,6 +81,9 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetKnownClientsForAllShardsInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetKnownClientsForAllShardsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetKnownClientsForAllShardsOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutputBuilder;
@@ -96,19 +105,25 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.ShardResult1Builder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.shard.result.KnownClients;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.shard.result.KnownClientsBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.locate.shard.output.member.node.LeaderActorRefBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.locate.shard.output.member.node.LocalBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingState;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultKey;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.Empty;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.common.Uint32;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
 
 /**
  * Implements the yang RPCs defined in the generated ClusterAdminService interface.
@@ -275,8 +290,7 @@ public class ClusterAdminRpcService implements ClusterAdminService {
         LOG.info("Moving leader to local node {} for shard {}, datastoreType {}",
                 actorUtils.getCurrentMemberName().getName(), shardName, dataStoreType);
 
-        final scala.concurrent.Future<ActorRef> localShardReply =
-                actorUtils.findLocalShardAsync(shardName);
+        final Future<ActorRef> localShardReply = actorUtils.findLocalShardAsync(shardName);
 
         final scala.concurrent.Promise<Object> makeLeaderLocalAsk = akka.dispatch.Futures.promise();
         localShardReply.onComplete(new OnComplete<ActorRef>() {
@@ -295,7 +309,7 @@ public class ClusterAdminRpcService implements ClusterAdminService {
         }, actorUtils.getClientDispatcher());
 
         final SettableFuture<RpcResult<MakeLeaderLocalOutput>> future = SettableFuture.create();
-        makeLeaderLocalAsk.future().onComplete(new OnComplete<Object>() {
+        makeLeaderLocalAsk.future().onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable failure, final Object success) {
                 if (failure != null) {
@@ -609,9 +623,13 @@ public class ClusterAdminRpcService implements ClusterAdminService {
             return newFailedRpcResultFuture("A valid file path must be specified");
         }
 
+        final Uint32 timeout = input.getTimeout();
+        final Timeout opTimeout = timeout != null ? Timeout.apply(timeout.longValue(), TimeUnit.SECONDS)
+                : SHARD_MGR_TIMEOUT;
+
         final SettableFuture<RpcResult<BackupDatastoreOutput>> returnFuture = SettableFuture.create();
-        ListenableFuture<List<DatastoreSnapshot>> future = sendMessageToShardManagers(GetSnapshot.INSTANCE);
-        Futures.addCallback(future, new FutureCallback<List<DatastoreSnapshot>>() {
+        ListenableFuture<List<DatastoreSnapshot>> future = sendMessageToShardManagers(new GetSnapshot(opTimeout));
+        Futures.addCallback(future, new FutureCallback<>() {
             @Override
             public void onSuccess(final List<DatastoreSnapshot> snapshots) {
                 saveSnapshotsToFile(new DatastoreSnapshotList(snapshots), input.getFilePath(), returnFuture);
@@ -626,6 +644,58 @@ public class ClusterAdminRpcService implements ClusterAdminService {
         return returnFuture;
     }
 
+
+    @Override
+    public ListenableFuture<RpcResult<GetKnownClientsForAllShardsOutput>> getKnownClientsForAllShards(
+            final GetKnownClientsForAllShardsInput input) {
+        final ImmutableMap<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> allShardReplies =
+                getAllShardLeadersClients();
+        return Futures.whenAllComplete(allShardReplies.values()).call(() -> processReplies(allShardReplies),
+            MoreExecutors.directExecutor());
+    }
+
+    private static RpcResult<GetKnownClientsForAllShardsOutput> processReplies(
+            final ImmutableMap<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> allShardReplies) {
+        final Map<ShardResultKey, ShardResult> result = Maps.newHashMapWithExpectedSize(allShardReplies.size());
+        for (Entry<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> entry : allShardReplies.entrySet()) {
+            final ListenableFuture<GetKnownClientsReply> future = entry.getValue();
+            final ShardResultBuilder builder = new ShardResultBuilder()
+                    .setDataStoreType(entry.getKey().getDataStoreType())
+                    .setShardName(entry.getKey().getShardName());
+
+            final GetKnownClientsReply reply;
+            try {
+                reply = Futures.getDone(future);
+            } catch (ExecutionException e) {
+                LOG.debug("Shard {} failed to answer", entry.getKey(), e);
+                final ShardResult sr = builder
+                        .setSucceeded(Boolean.FALSE)
+                        .setErrorMessage(e.getCause().getMessage())
+                        .build();
+                result.put(sr.key(), sr);
+                continue;
+            }
+
+            final ShardResult sr = builder
+                    .setSucceeded(Boolean.TRUE)
+                    .addAugmentation(new ShardResult1Builder()
+                        .setKnownClients(reply.getClients().stream()
+                            .map(client -> new KnownClientsBuilder()
+                                .setMember(client.getFrontendId().getMemberName().toYang())
+                                .setType(client.getFrontendId().getClientType().toYang())
+                                .setGeneration(client.getYangGeneration())
+                                .build())
+                            .collect(Collectors.toMap(KnownClients::key, Function.identity())))
+                        .build())
+                    .build();
+
+            result.put(sr.key(), sr);
+        }
+
+        return RpcResultBuilder.success(new GetKnownClientsForAllShardsOutputBuilder().setShardResult(result).build())
+                .build();
+    }
+
     private static ChangeShardMembersVotingStatus toChangeShardMembersVotingStatus(final String shardName,
             final List<MemberVotingState> memberVotingStatus) {
         Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
@@ -637,19 +707,20 @@ public class ClusterAdminRpcService implements ClusterAdminService {
 
     private static <T> SettableFuture<RpcResult<T>> waitForShardResults(
             final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData,
-            final Function<List<ShardResult>, T> resultDataSupplier,
+            final Function<Map<ShardResultKey, ShardResult>, T> resultDataSupplier,
             final String failureLogMsgPrefix) {
         final SettableFuture<RpcResult<T>> returnFuture = SettableFuture.create();
-        final List<ShardResult> shardResults = new ArrayList<>();
+        final Map<ShardResultKey, ShardResult> shardResults = new HashMap<>();
         for (final Entry<ListenableFuture<Success>, ShardResultBuilder> entry : shardResultData) {
             Futures.addCallback(entry.getKey(), new FutureCallback<Success>() {
                 @Override
                 public void onSuccess(final Success result) {
                     synchronized (shardResults) {
-                        ShardResultBuilder shardResult = entry.getValue();
-                        LOG.debug("onSuccess for shard {}, type {}", shardResult.getShardName(),
-                                shardResult.getDataStoreType());
-                        shardResults.add(shardResult.setSucceeded(true).build());
+                        final ShardResultBuilder builder = entry.getValue();
+                        LOG.debug("onSuccess for shard {}, type {}", builder.getShardName(),
+                            builder.getDataStoreType());
+                        final ShardResult sr = builder.setSucceeded(Boolean.TRUE).build();
+                        shardResults.put(sr.key(), sr);
                         checkIfComplete();
                     }
                 }
@@ -657,11 +728,14 @@ public class ClusterAdminRpcService implements ClusterAdminService {
                 @Override
                 public void onFailure(final Throwable failure) {
                     synchronized (shardResults) {
-                        ShardResultBuilder shardResult = entry.getValue();
-                        LOG.warn("{} for shard {}, type {}", failureLogMsgPrefix, shardResult.getShardName(),
-                                shardResult.getDataStoreType(), failure);
-                        shardResults.add(shardResult.setSucceeded(false).setErrorMessage(
-                                Throwables.getRootCause(failure).getMessage()).build());
+                        ShardResultBuilder builder = entry.getValue();
+                        LOG.warn("{} for shard {}, type {}", failureLogMsgPrefix, builder.getShardName(),
+                                builder.getDataStoreType(), failure);
+                        final ShardResult sr = builder
+                                .setSucceeded(Boolean.FALSE)
+                                .setErrorMessage(Throwables.getRootCause(failure).getMessage())
+                                .build();
+                        shardResults.put(sr.key(), sr);
                         checkIfComplete();
                     }
                 }
@@ -741,7 +815,7 @@ public class ClusterAdminRpcService implements ClusterAdminService {
         final SettableFuture<T> returnFuture = SettableFuture.create();
 
         @SuppressWarnings("unchecked")
-        scala.concurrent.Future<T> askFuture = (scala.concurrent.Future<T>) Patterns.ask(actor, message, timeout);
+        Future<T> askFuture = (Future<T>) Patterns.ask(actor, message, timeout);
         askFuture.onComplete(new OnComplete<T>() {
             @Override
             public void onComplete(final Throwable failure, final T resp) {
@@ -756,6 +830,38 @@ public class ClusterAdminRpcService implements ClusterAdminService {
         return returnFuture;
     }
 
+    private ImmutableMap<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> getAllShardLeadersClients() {
+        final ImmutableMap.Builder<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> builder =
+                ImmutableMap.builder();
+
+        addAllShardsClients(builder, DataStoreType.Config, configDataStore.getActorUtils());
+        addAllShardsClients(builder, DataStoreType.Operational, operDataStore.getActorUtils());
+
+        return builder.build();
+    }
+
+    private static void addAllShardsClients(
+            final ImmutableMap.Builder<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> builder,
+            final DataStoreType type, final ActorUtils utils) {
+        for (String shardName : utils.getConfiguration().getAllShardNames()) {
+            final SettableFuture<GetKnownClientsReply> future = SettableFuture.create();
+            builder.put(new ShardIdentifier(type, shardName), future);
+
+            utils.findPrimaryShardAsync(shardName).flatMap(
+                info -> Patterns.ask(info.getPrimaryShardActor(), GetKnownClients.INSTANCE, SHARD_MGR_TIMEOUT),
+                utils.getClientDispatcher()).onComplete(new OnComplete<>() {
+                    @Override
+                    public void onComplete(final Throwable failure, final Object success) {
+                        if (failure == null) {
+                            future.set((GetKnownClientsReply) success);
+                        } else {
+                            future.setException(failure);
+                        }
+                    }
+                }, utils.getClientDispatcher());
+        }
+    }
+
     private static <T> ListenableFuture<RpcResult<T>> newFailedRpcResultFuture(final String message) {
         return ClusterAdminRpcService.<T>newFailedRpcResultBuilder(message).buildFuture();
     }