Fix CS warnings in sal-cluster-admin and and enable enforcement
[controller.git] / opendaylight / md-sal / sal-cluster-admin / src / main / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcService.java
index d52a15483c6161cef8bbfb38ed7eb73fc1185f4c..a55eca1b8363d07f922367d13d3f6eb841cf5070 100644 (file)
@@ -22,7 +22,9 @@ import com.google.common.util.concurrent.SettableFuture;
 import java.io.FileOutputStream;
 import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Future;
@@ -31,25 +33,30 @@ import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList;
+import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToNonvotingForAllShardsInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToVotingForAllShardsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingState;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
@@ -63,14 +70,13 @@ import org.slf4j.LoggerFactory;
  *
  * @author Thomas Pantelis
  */
-public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseable {
+public class ClusterAdminRpcService implements ClusterAdminService {
     private static final Timeout SHARD_MGR_TIMEOUT = new Timeout(1, TimeUnit.MINUTES);
 
     private static final Logger LOG = LoggerFactory.getLogger(ClusterAdminRpcService.class);
 
     private final DistributedDataStoreInterface configDataStore;
     private final DistributedDataStoreInterface operDataStore;
-    private RpcRegistration<ClusterAdminService> rpcRegistration;
 
     public ClusterAdminRpcService(DistributedDataStoreInterface configDataStore,
             DistributedDataStoreInterface operDataStore) {
@@ -78,28 +84,15 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
         this.operDataStore = operDataStore;
     }
 
-    public void start(RpcProviderRegistry rpcProviderRegistry) {
-        LOG.debug("ClusterAdminRpcService starting");
-
-        rpcRegistration = rpcProviderRegistry.addRpcImplementation(ClusterAdminService.class, this);
-    }
-
-    @Override
-    public void close() {
-        if(rpcRegistration != null) {
-            rpcRegistration.close();
-        }
-    }
-
     @Override
     public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
         final String shardName = input.getShardName();
-        if(Strings.isNullOrEmpty(shardName)) {
+        if (Strings.isNullOrEmpty(shardName)) {
             return newFailedRpcResultFuture("A valid shard name must be specified");
         }
 
         DataStoreType dataStoreType = input.getDataStoreType();
-        if(dataStoreType == null) {
+        if (dataStoreType == null) {
             return newFailedRpcResultFuture("A valid DataStoreType must be specified");
         }
 
@@ -127,17 +120,17 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
     @Override
     public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
         final String shardName = input.getShardName();
-        if(Strings.isNullOrEmpty(shardName)) {
+        if (Strings.isNullOrEmpty(shardName)) {
             return newFailedRpcResultFuture("A valid shard name must be specified");
         }
 
         DataStoreType dataStoreType = input.getDataStoreType();
-        if(dataStoreType == null) {
+        if (dataStoreType == null) {
             return newFailedRpcResultFuture("A valid DataStoreType must be specified");
         }
 
         final String memberName = input.getMemberName();
-        if(Strings.isNullOrEmpty(memberName)) {
+        if (Strings.isNullOrEmpty(memberName)) {
             return newFailedRpcResultFuture("A valid member name must be specified");
         }
 
@@ -168,22 +161,14 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
         LOG.info("Adding replicas for all shards");
 
         final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
-        Function<String, Object> messageSupplier = new Function<String, Object>() {
-            @Override
-            public Object apply(String shardName) {
-                return new AddShardReplica(shardName);
-            }
-        };
+        Function<String, Object> messageSupplier = shardName -> new AddShardReplica(shardName);
 
         sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
         sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
 
-        return waitForShardResults(shardResultData, new Function<List<ShardResult>, AddReplicasForAllShardsOutput>() {
-            @Override
-            public AddReplicasForAllShardsOutput apply(List<ShardResult> shardResults) {
-                return new AddReplicasForAllShardsOutputBuilder().setShardResult(shardResults).build();
-            }
-        }, "Failed to add replica");
+        return waitForShardResults(shardResultData, shardResults ->
+                new AddReplicasForAllShardsOutputBuilder().setShardResult(shardResults).build(),
+                "Failed to add replica");
     }
 
 
@@ -192,49 +177,107 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
         LOG.info("Removing replicas for all shards");
 
         final String memberName = input.getMemberName();
-        if(Strings.isNullOrEmpty(memberName)) {
+        if (Strings.isNullOrEmpty(memberName)) {
             return newFailedRpcResultFuture("A valid member name must be specified");
         }
 
         final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
-        Function<String, Object> messageSupplier = new Function<String, Object>() {
-            @Override
-            public Object apply(String shardName) {
-                return new RemoveShardReplica(shardName, MemberName.forName(memberName));
-            }
-        };
+        Function<String, Object> messageSupplier = shardName ->
+                new RemoveShardReplica(shardName, MemberName.forName(memberName));
 
         sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
         sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
 
-        return waitForShardResults(shardResultData, new Function<List<ShardResult>, RemoveAllShardReplicasOutput>() {
+        return waitForShardResults(shardResultData, shardResults ->
+                new RemoveAllShardReplicasOutputBuilder().setShardResult(shardResults).build(),
+        "       Failed to remove replica");
+    }
+
+    @Override
+    public Future<RpcResult<Void>> changeMemberVotingStatesForShard(ChangeMemberVotingStatesForShardInput input) {
+        final String shardName = input.getShardName();
+        if (Strings.isNullOrEmpty(shardName)) {
+            return newFailedRpcResultFuture("A valid shard name must be specified");
+        }
+
+        DataStoreType dataStoreType = input.getDataStoreType();
+        if (dataStoreType == null) {
+            return newFailedRpcResultFuture("A valid DataStoreType must be specified");
+        }
+
+        List<MemberVotingState> memberVotingStates = input.getMemberVotingState();
+        if (memberVotingStates == null || memberVotingStates.isEmpty()) {
+            return newFailedRpcResultFuture("No member voting state input was specified");
+        }
+
+        ChangeShardMembersVotingStatus changeVotingStatus = toChangeShardMembersVotingStatus(shardName,
+                memberVotingStates);
+
+        LOG.info("Change member voting states for shard {}: {}", shardName,
+                changeVotingStatus.getMeberVotingStatusMap());
+
+        final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
+        ListenableFuture<Success> future = sendMessageToShardManager(dataStoreType, changeVotingStatus);
+        Futures.addCallback(future, new FutureCallback<Success>() {
+            @Override
+            public void onSuccess(Success success) {
+                LOG.info("Successfully changed member voting states for shard {}", shardName);
+                returnFuture.set(newSuccessfulResult());
+            }
+
             @Override
-            public RemoveAllShardReplicasOutput apply(List<ShardResult> shardResults) {
-                return new RemoveAllShardReplicasOutputBuilder().setShardResult(shardResults).build();
+            public void onFailure(Throwable failure) {
+                onMessageFailure(String.format("Failed to change member voting states for shard %s", shardName),
+                        returnFuture, failure);
             }
-        }, "Failed to add replica");
+        });
+
+        return returnFuture;
     }
 
     @Override
-    public Future<RpcResult<Void>> convertMembersToVotingForAllShards(ConvertMembersToVotingForAllShardsInput input) {
-        // TODO implement
-        return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
-                "Not implemented yet").buildFuture();
+    public Future<RpcResult<ChangeMemberVotingStatesForAllShardsOutput>> changeMemberVotingStatesForAllShards(
+            final ChangeMemberVotingStatesForAllShardsInput input) {
+        List<MemberVotingState> memberVotingStates = input.getMemberVotingState();
+        if (memberVotingStates == null || memberVotingStates.isEmpty()) {
+            return newFailedRpcResultFuture("No member voting state input was specified");
+        }
+
+        final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
+        Function<String, Object> messageSupplier = shardName ->
+                toChangeShardMembersVotingStatus(shardName, memberVotingStates);
+
+        LOG.info("Change member voting states for all shards");
+
+        sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
+        sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
+
+        return waitForShardResults(shardResultData, shardResults ->
+                new ChangeMemberVotingStatesForAllShardsOutputBuilder().setShardResult(shardResults).build(),
+                "Failed to change member voting states");
     }
 
     @Override
-    public Future<RpcResult<Void>> convertMembersToNonvotingForAllShards(
-            ConvertMembersToNonvotingForAllShardsInput input) {
-        // TODO implement
-        return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
-                "Not implemented yet").buildFuture();
+    public Future<RpcResult<FlipMemberVotingStatesForAllShardsOutput>> flipMemberVotingStatesForAllShards() {
+        final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
+        Function<String, Object> messageSupplier = shardName ->
+                new FlipShardMembersVotingStatus(shardName);
+
+        LOG.info("Flip member voting states for all shards");
+
+        sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
+        sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
+
+        return waitForShardResults(shardResultData, shardResults ->
+                new FlipMemberVotingStatesForAllShardsOutputBuilder().setShardResult(shardResults).build(),
+                "Failed to change member voting states");
     }
 
     @Override
     public Future<RpcResult<Void>> backupDatastore(final BackupDatastoreInput input) {
         LOG.debug("backupDatastore: {}", input);
 
-        if(Strings.isNullOrEmpty(input.getFilePath())) {
+        if (Strings.isNullOrEmpty(input.getFilePath())) {
             return newFailedRpcResultFuture("A valid file path must be specified");
         }
 
@@ -255,17 +298,29 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
         return returnFuture;
     }
 
+    private ChangeShardMembersVotingStatus toChangeShardMembersVotingStatus(final String shardName,
+            List<MemberVotingState> memberVotingStatus) {
+        Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+        for (MemberVotingState memberStatus: memberVotingStatus) {
+            serverVotingStatusMap.put(memberStatus.getMemberName(), memberStatus.isVoting());
+        }
+
+        ChangeShardMembersVotingStatus changeVotingStatus = new ChangeShardMembersVotingStatus(shardName,
+                serverVotingStatusMap);
+        return changeVotingStatus;
+    }
+
     private static <T> SettableFuture<RpcResult<T>> waitForShardResults(
             final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData,
             final Function<List<ShardResult>, T> resultDataSupplier,
             final String failureLogMsgPrefix) {
         final SettableFuture<RpcResult<T>> returnFuture = SettableFuture.create();
         final List<ShardResult> shardResults = new ArrayList<>();
-        for(final Entry<ListenableFuture<Success>, ShardResultBuilder> entry: shardResultData) {
+        for (final Entry<ListenableFuture<Success>, ShardResultBuilder> entry : shardResultData) {
             Futures.addCallback(entry.getKey(), new FutureCallback<Success>() {
                 @Override
                 public void onSuccess(Success result) {
-                    synchronized(shardResults) {
+                    synchronized (shardResults) {
                         ShardResultBuilder shardResult = entry.getValue();
                         LOG.debug("onSuccess for shard {}, type {}", shardResult.getShardName(),
                                 shardResult.getDataStoreType());
@@ -275,20 +330,20 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
                 }
 
                 @Override
-                public void onFailure(Throwable t) {
-                    synchronized(shardResults) {
+                public void onFailure(Throwable failure) {
+                    synchronized (shardResults) {
                         ShardResultBuilder shardResult = entry.getValue();
                         LOG.warn("{} for shard {}, type {}", failureLogMsgPrefix, shardResult.getShardName(),
-                                shardResult.getDataStoreType(), t);
+                                shardResult.getDataStoreType(), failure);
                         shardResults.add(shardResult.setSucceeded(false).setErrorMessage(
-                                Throwables.getRootCause(t).getMessage()).build());
+                                Throwables.getRootCause(failure).getMessage()).build());
                         checkIfComplete();
                     }
                 }
 
                 void checkIfComplete() {
                     LOG.debug("checkIfComplete: expected {}, actual {}", shardResultData.size(), shardResults.size());
-                    if(shardResults.size() == shardResultData.size()) {
+                    if (shardResults.size() == shardResultData.size()) {
                         returnFuture.set(newSuccessfulResult(resultDataSupplier.apply(shardResults)));
                     }
                 }
@@ -300,16 +355,16 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
     private <T> void sendMessageToManagerForConfiguredShards(DataStoreType dataStoreType,
             List<Entry<ListenableFuture<T>, ShardResultBuilder>> shardResultData,
             Function<String, Object> messageSupplier) {
-        ActorContext actorContext = dataStoreType == DataStoreType.Config ?
-                configDataStore.getActorContext() : operDataStore.getActorContext();
+        ActorContext actorContext = dataStoreType == DataStoreType.Config ? configDataStore.getActorContext()
+                : operDataStore.getActorContext();
         Set<String> allShardNames = actorContext.getConfiguration().getAllShardNames();
 
         LOG.debug("Sending message to all shards {} for data store {}", allShardNames, actorContext.getDataStoreName());
 
-        for(String shardName: allShardNames) {
+        for (String shardName: allShardNames) {
             ListenableFuture<T> future = this.<T>ask(actorContext.getShardManager(), messageSupplier.apply(shardName),
                     SHARD_MGR_TIMEOUT);
-            shardResultData.add(new SimpleEntry<ListenableFuture<T>, ShardResultBuilder>(future,
+            shardResultData.add(new SimpleEntry<>(future,
                     new ShardResultBuilder().setShardName(shardName).setDataStoreType(dataStoreType)));
         }
     }
@@ -324,19 +379,21 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
     }
 
     private <T> ListenableFuture<T> sendMessageToShardManager(DataStoreType dataStoreType, Object message) {
-        ActorRef shardManager = dataStoreType == DataStoreType.Config ?
-                configDataStore.getActorContext().getShardManager() : operDataStore.getActorContext().getShardManager();
+        ActorRef shardManager = dataStoreType == DataStoreType.Config
+                ? configDataStore.getActorContext().getShardManager()
+                        : operDataStore.getActorContext().getShardManager();
         return ask(shardManager, message, SHARD_MGR_TIMEOUT);
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
     private static void saveSnapshotsToFile(DatastoreSnapshotList snapshots, String fileName,
             SettableFuture<RpcResult<Void>> returnFuture) {
-        try(FileOutputStream fos = new FileOutputStream(fileName)) {
+        try (FileOutputStream fos = new FileOutputStream(fileName)) {
             SerializationUtils.serialize(snapshots, fos);
 
             returnFuture.set(newSuccessfulResult());
             LOG.info("Successfully backed up datastore to file {}", fileName);
-        } catch(Exception e) {
+        } catch (Exception e) {
             onDatastoreBackupFailure(fileName, returnFuture, e);
         }
     }
@@ -361,7 +418,7 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
         askFuture.onComplete(new OnComplete<T>() {
             @Override
             public void onComplete(Throwable failure, T resp) {
-                if(failure != null) {
+                if (failure != null) {
                     returnFuture.setException(failure);
                 } else {
                     returnFuture.set(resp);