Implement cluster admin RPCs to change member voting states
[controller.git] / opendaylight / md-sal / sal-cluster-admin / src / main / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcService.java
index d52a15483c6161cef8bbfb38ed7eb73fc1185f4c..617120a0489aa459ae3d78abf8a7759f6fa91fab 100644 (file)
@@ -22,7 +22,9 @@ import com.google.common.util.concurrent.SettableFuture;
 import java.io.FileOutputStream;
 import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Future;
@@ -31,25 +33,30 @@ import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList;
+import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToNonvotingForAllShardsInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToVotingForAllShardsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingState;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
@@ -63,14 +70,13 @@ import org.slf4j.LoggerFactory;
  *
  * @author Thomas Pantelis
  */
-public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseable {
+public class ClusterAdminRpcService implements ClusterAdminService {
     private static final Timeout SHARD_MGR_TIMEOUT = new Timeout(1, TimeUnit.MINUTES);
 
     private static final Logger LOG = LoggerFactory.getLogger(ClusterAdminRpcService.class);
 
     private final DistributedDataStoreInterface configDataStore;
     private final DistributedDataStoreInterface operDataStore;
-    private RpcRegistration<ClusterAdminService> rpcRegistration;
 
     public ClusterAdminRpcService(DistributedDataStoreInterface configDataStore,
             DistributedDataStoreInterface operDataStore) {
@@ -78,19 +84,6 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
         this.operDataStore = operDataStore;
     }
 
-    public void start(RpcProviderRegistry rpcProviderRegistry) {
-        LOG.debug("ClusterAdminRpcService starting");
-
-        rpcRegistration = rpcProviderRegistry.addRpcImplementation(ClusterAdminService.class, this);
-    }
-
-    @Override
-    public void close() {
-        if(rpcRegistration != null) {
-            rpcRegistration.close();
-        }
-    }
-
     @Override
     public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
         final String shardName = input.getShardName();
@@ -168,22 +161,14 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
         LOG.info("Adding replicas for all shards");
 
         final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
-        Function<String, Object> messageSupplier = new Function<String, Object>() {
-            @Override
-            public Object apply(String shardName) {
-                return new AddShardReplica(shardName);
-            }
-        };
+        Function<String, Object> messageSupplier = shardName -> new AddShardReplica(shardName);
 
         sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
         sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
 
-        return waitForShardResults(shardResultData, new Function<List<ShardResult>, AddReplicasForAllShardsOutput>() {
-            @Override
-            public AddReplicasForAllShardsOutput apply(List<ShardResult> shardResults) {
-                return new AddReplicasForAllShardsOutputBuilder().setShardResult(shardResults).build();
-            }
-        }, "Failed to add replica");
+        return waitForShardResults(shardResultData, shardResults ->
+                new AddReplicasForAllShardsOutputBuilder().setShardResult(shardResults).build(),
+                "Failed to add replica");
     }
 
 
@@ -197,37 +182,95 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
         }
 
         final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
-        Function<String, Object> messageSupplier = new Function<String, Object>() {
-            @Override
-            public Object apply(String shardName) {
-                return new RemoveShardReplica(shardName, MemberName.forName(memberName));
-            }
-        };
+        Function<String, Object> messageSupplier = shardName ->
+                new RemoveShardReplica(shardName, MemberName.forName(memberName));
 
         sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
         sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
 
-        return waitForShardResults(shardResultData, new Function<List<ShardResult>, RemoveAllShardReplicasOutput>() {
+        return waitForShardResults(shardResultData, shardResults ->
+                new RemoveAllShardReplicasOutputBuilder().setShardResult(shardResults).build(),
+        "       Failed to remove replica");
+    }
+
+    @Override
+    public Future<RpcResult<Void>> changeMemberVotingStatesForShard(ChangeMemberVotingStatesForShardInput input) {
+        final String shardName = input.getShardName();
+        if(Strings.isNullOrEmpty(shardName)) {
+            return newFailedRpcResultFuture("A valid shard name must be specified");
+        }
+
+        DataStoreType dataStoreType = input.getDataStoreType();
+        if(dataStoreType == null) {
+            return newFailedRpcResultFuture("A valid DataStoreType must be specified");
+        }
+
+        List<MemberVotingState> memberVotingStates = input.getMemberVotingState();
+        if(memberVotingStates == null || memberVotingStates.isEmpty()) {
+            return newFailedRpcResultFuture("No member voting state input was specified");
+        }
+
+        ChangeShardMembersVotingStatus changeVotingStatus = toChangeShardMembersVotingStatus(shardName,
+                memberVotingStates);
+
+        LOG.info("Change member voting states for shard {}: {}", shardName,
+                changeVotingStatus.getMeberVotingStatusMap());
+
+        final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
+        ListenableFuture<Success> future = sendMessageToShardManager(dataStoreType, changeVotingStatus);
+        Futures.addCallback(future, new FutureCallback<Success>() {
             @Override
-            public RemoveAllShardReplicasOutput apply(List<ShardResult> shardResults) {
-                return new RemoveAllShardReplicasOutputBuilder().setShardResult(shardResults).build();
+            public void onSuccess(Success success) {
+                LOG.info("Successfully changed member voting states for shard {}", shardName);
+                returnFuture.set(newSuccessfulResult());
             }
-        }, "Failed to add replica");
+
+            @Override
+            public void onFailure(Throwable failure) {
+                onMessageFailure(String.format("Failed to change member voting states for shard %s", shardName),
+                        returnFuture, failure);
+            }
+        });
+
+        return returnFuture;
     }
 
     @Override
-    public Future<RpcResult<Void>> convertMembersToVotingForAllShards(ConvertMembersToVotingForAllShardsInput input) {
-        // TODO implement
-        return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
-                "Not implemented yet").buildFuture();
+    public Future<RpcResult<ChangeMemberVotingStatesForAllShardsOutput>> changeMemberVotingStatesForAllShards(
+            final ChangeMemberVotingStatesForAllShardsInput input) {
+        List<MemberVotingState> memberVotingStates = input.getMemberVotingState();
+        if(memberVotingStates == null || memberVotingStates.isEmpty()) {
+            return newFailedRpcResultFuture("No member voting state input was specified");
+        }
+
+        final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
+        Function<String, Object> messageSupplier = shardName ->
+                toChangeShardMembersVotingStatus(shardName, memberVotingStates);
+
+        LOG.info("Change member voting states for all shards");
+
+        sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
+        sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
+
+        return waitForShardResults(shardResultData, shardResults ->
+                new ChangeMemberVotingStatesForAllShardsOutputBuilder().setShardResult(shardResults).build(),
+                "Failed to change member voting states");
     }
 
     @Override
-    public Future<RpcResult<Void>> convertMembersToNonvotingForAllShards(
-            ConvertMembersToNonvotingForAllShardsInput input) {
-        // TODO implement
-        return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
-                "Not implemented yet").buildFuture();
+    public Future<RpcResult<FlipMemberVotingStatesForAllShardsOutput>> flipMemberVotingStatesForAllShards() {
+        final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
+        Function<String, Object> messageSupplier = shardName ->
+                new FlipShardMembersVotingStatus(shardName);
+
+        LOG.info("Flip member voting states for all shards");
+
+        sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
+        sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
+
+        return waitForShardResults(shardResultData, shardResults ->
+                new FlipMemberVotingStatesForAllShardsOutputBuilder().setShardResult(shardResults).build(),
+                "Failed to change member voting states");
     }
 
     @Override
@@ -255,6 +298,18 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
         return returnFuture;
     }
 
+    private ChangeShardMembersVotingStatus toChangeShardMembersVotingStatus(final String shardName,
+            List<MemberVotingState> memberVotingStatus) {
+        Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+        for(MemberVotingState memberStatus: memberVotingStatus) {
+            serverVotingStatusMap.put(memberStatus.getMemberName(), memberStatus.isVoting());
+        }
+
+        ChangeShardMembersVotingStatus changeVotingStatus = new ChangeShardMembersVotingStatus(shardName,
+                serverVotingStatusMap);
+        return changeVotingStatus;
+    }
+
     private static <T> SettableFuture<RpcResult<T>> waitForShardResults(
             final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData,
             final Function<List<ShardResult>, T> resultDataSupplier,