Replace deprecated Futures.addCallback by the newer version
[controller.git] / opendaylight / md-sal / sal-cluster-admin-impl / src / main / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcService.java
index ab85331c14d57a052a5c84e92713b89a8adaffb1..1bda653fc2ed63814b5706fd0404734bcbfa8bff 100644 (file)
@@ -12,12 +12,14 @@ import akka.actor.Status.Success;
 import akka.dispatch.OnComplete;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
+
 import com.google.common.base.Function;
 import com.google.common.base.Strings;
 import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -37,12 +39,15 @@ import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardRepl
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
+import org.opendaylight.controller.cluster.datastore.messages.GetShardRole;
+import org.opendaylight.controller.cluster.datastore.messages.GetShardRoleReply;
 import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
 import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshotList;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput;
@@ -58,6 +63,12 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
@@ -130,7 +141,7 @@ public class ClusterAdminRpcService implements ClusterAdminService {
                 onMessageFailure(String.format("Failed to add replica for shard %s", shardName),
                         returnFuture, failure);
             }
-        });
+        }, MoreExecutors.directExecutor());
 
         return returnFuture;
     }
@@ -169,7 +180,7 @@ public class ClusterAdminRpcService implements ClusterAdminService {
                 onMessageFailure(String.format("Failed to remove replica for shard %s", shardName),
                         returnFuture, failure);
             }
-        });
+        }, MoreExecutors.directExecutor());
 
         return returnFuture;
     }
@@ -186,12 +197,13 @@ public class ClusterAdminRpcService implements ClusterAdminService {
             return newFailedRpcResultFuture("A valid DataStoreType must be specified");
         }
 
-        LOG.info("Moving leader to local node for shard {}, datastoreType {}", shardName, dataStoreType);
-
         ActorContext actorContext = dataStoreType == DataStoreType.Config
                 ? configDataStore.getActorContext()
                 : operDataStore.getActorContext();
 
+        LOG.info("Moving leader to local node {} for shard {}, datastoreType {}",
+                actorContext.getCurrentMemberName().getName(), shardName, dataStoreType);
+
         final scala.concurrent.Future<ActorRef> localShardReply =
                 actorContext.findLocalShardAsync(shardName);
 
@@ -222,7 +234,7 @@ public class ClusterAdminRpcService implements ClusterAdminService {
                     return;
                 }
 
-                LOG.debug("Leadership transfer complete {}.", success);
+                LOG.debug("Leadership transfer complete");
                 future.set(RpcResultBuilder.<Void>success().build());
             }
         }, actorContext.getClientDispatcher());
@@ -260,7 +272,7 @@ public class ClusterAdminRpcService implements ClusterAdminService {
                 onMessageFailure(String.format("Failed to add replica for shard %s", prefix),
                         returnFuture, failure);
             }
-        });
+        }, MoreExecutors.directExecutor());
 
         return returnFuture;
     }
@@ -302,7 +314,7 @@ public class ClusterAdminRpcService implements ClusterAdminService {
                 onMessageFailure(String.format("Failed to remove replica for shard %s", prefix),
                         returnFuture, failure);
             }
-        });
+        }, MoreExecutors.directExecutor());
 
         return returnFuture;
     }
@@ -312,7 +324,7 @@ public class ClusterAdminRpcService implements ClusterAdminService {
         LOG.info("Adding replicas for all shards");
 
         final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
-        Function<String, Object> messageSupplier = shardName -> new AddShardReplica(shardName);
+        Function<String, Object> messageSupplier = AddShardReplica::new;
 
         sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
         sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
@@ -381,7 +393,7 @@ public class ClusterAdminRpcService implements ClusterAdminService {
                 onMessageFailure(String.format("Failed to change member voting states for shard %s", shardName),
                         returnFuture, failure);
             }
-        });
+        }, MoreExecutors.directExecutor());
 
         return returnFuture;
     }
@@ -411,8 +423,7 @@ public class ClusterAdminRpcService implements ClusterAdminService {
     @Override
     public Future<RpcResult<FlipMemberVotingStatesForAllShardsOutput>> flipMemberVotingStatesForAllShards() {
         final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
-        Function<String, Object> messageSupplier = shardName ->
-                new FlipShardMembersVotingStatus(shardName);
+        Function<String, Object> messageSupplier = FlipShardMembersVotingStatus::new;
 
         LOG.info("Flip member voting states for all shards");
 
@@ -424,6 +435,95 @@ public class ClusterAdminRpcService implements ClusterAdminService {
                 "Failed to change member voting states");
     }
 
+    @Override
+    public Future<RpcResult<GetShardRoleOutput>> getShardRole(final GetShardRoleInput input) {
+        final String shardName = input.getShardName();
+        if (Strings.isNullOrEmpty(shardName)) {
+            return newFailedRpcResultFuture("A valid shard name must be specified");
+        }
+
+        DataStoreType dataStoreType = input.getDataStoreType();
+        if (dataStoreType == null) {
+            return newFailedRpcResultFuture("A valid DataStoreType must be specified");
+        }
+
+        LOG.info("Getting role for shard {}, datastore type {}", shardName, dataStoreType);
+
+        final SettableFuture<RpcResult<GetShardRoleOutput>> returnFuture = SettableFuture.create();
+        ListenableFuture<GetShardRoleReply> future = sendMessageToShardManager(dataStoreType,
+                new GetShardRole(shardName));
+        Futures.addCallback(future, new FutureCallback<GetShardRoleReply>() {
+            @Override
+            public void onSuccess(final GetShardRoleReply reply) {
+                if (reply == null) {
+                    returnFuture.set(ClusterAdminRpcService.<GetShardRoleOutput>newFailedRpcResultBuilder(
+                            "No Shard role present. Please retry..").build());
+                    return;
+                }
+                LOG.info("Successfully received role:{} for shard {}", reply.getRole(), shardName);
+                final GetShardRoleOutputBuilder builder = new GetShardRoleOutputBuilder();
+                if (reply.getRole() != null) {
+                    builder.setRole(reply.getRole());
+                }
+                returnFuture.set(newSuccessfulResult(builder.build()));
+            }
+
+            @Override
+            public void onFailure(final Throwable failure) {
+                returnFuture.set(ClusterAdminRpcService.<GetShardRoleOutput>newFailedRpcResultBuilder(
+                        "Failed to get shard role.", failure).build());
+            }
+        }, MoreExecutors.directExecutor());
+
+        return returnFuture;
+    }
+
+    @Override
+    public Future<RpcResult<GetPrefixShardRoleOutput>> getPrefixShardRole(final GetPrefixShardRoleInput input) {
+        final InstanceIdentifier<?> identifier = input.getShardPrefix();
+        if (identifier == null) {
+            return newFailedRpcResultFuture("A valid shard identifier must be specified");
+        }
+
+        final DataStoreType dataStoreType = input.getDataStoreType();
+        if (dataStoreType == null) {
+            return newFailedRpcResultFuture("A valid DataStoreType must be specified");
+        }
+
+        LOG.info("Getting prefix shard role for shard: {}, datastore type {}", identifier, dataStoreType);
+
+        final YangInstanceIdentifier prefix = serializer.toYangInstanceIdentifier(identifier);
+        final String shardName = ClusterUtils.getCleanShardName(prefix);
+        final SettableFuture<RpcResult<GetPrefixShardRoleOutput>> returnFuture = SettableFuture.create();
+        ListenableFuture<GetShardRoleReply> future = sendMessageToShardManager(dataStoreType,
+                new GetShardRole(shardName));
+        Futures.addCallback(future, new FutureCallback<GetShardRoleReply>() {
+            @Override
+            public void onSuccess(final GetShardRoleReply reply) {
+                if (reply == null) {
+                    returnFuture.set(ClusterAdminRpcService.<GetPrefixShardRoleOutput>newFailedRpcResultBuilder(
+                            "No Shard role present. Please retry..").build());
+                    return;
+                }
+
+                LOG.info("Successfully received role:{} for shard {}", reply.getRole(), shardName);
+                final GetPrefixShardRoleOutputBuilder builder = new GetPrefixShardRoleOutputBuilder();
+                if (reply.getRole() != null) {
+                    builder.setRole(reply.getRole());
+                }
+                returnFuture.set(newSuccessfulResult(builder.build()));
+            }
+
+            @Override
+            public void onFailure(final Throwable failure) {
+                returnFuture.set(ClusterAdminRpcService.<GetPrefixShardRoleOutput>newFailedRpcResultBuilder(
+                        "Failed to get shard role.", failure).build());
+            }
+        }, MoreExecutors.directExecutor());
+
+        return returnFuture;
+    }
+
     @Override
     public Future<RpcResult<Void>> backupDatastore(final BackupDatastoreInput input) {
         LOG.debug("backupDatastore: {}", input);
@@ -444,7 +544,7 @@ public class ClusterAdminRpcService implements ClusterAdminService {
             public void onFailure(Throwable failure) {
                 onDatastoreBackupFailure(input.getFilePath(), returnFuture, failure);
             }
-        });
+        }, MoreExecutors.directExecutor());
 
         return returnFuture;
     }
@@ -455,10 +555,7 @@ public class ClusterAdminRpcService implements ClusterAdminService {
         for (MemberVotingState memberStatus: memberVotingStatus) {
             serverVotingStatusMap.put(memberStatus.getMemberName(), memberStatus.isVoting());
         }
-
-        ChangeShardMembersVotingStatus changeVotingStatus = new ChangeShardMembersVotingStatus(shardName,
-                serverVotingStatusMap);
-        return changeVotingStatus;
+        return new ChangeShardMembersVotingStatus(shardName, serverVotingStatusMap);
     }
 
     private static <T> SettableFuture<RpcResult<T>> waitForShardResults(
@@ -498,7 +595,7 @@ public class ClusterAdminRpcService implements ClusterAdminService {
                         returnFuture.set(newSuccessfulResult(resultDataSupplier.apply(shardResults)));
                     }
                 }
-            });
+            }, MoreExecutors.directExecutor());
         }
         return returnFuture;
     }
@@ -513,8 +610,8 @@ public class ClusterAdminRpcService implements ClusterAdminService {
         LOG.debug("Sending message to all shards {} for data store {}", allShardNames, actorContext.getDataStoreName());
 
         for (String shardName: allShardNames) {
-            ListenableFuture<T> future = this.<T>ask(actorContext.getShardManager(), messageSupplier.apply(shardName),
-                    SHARD_MGR_TIMEOUT);
+            ListenableFuture<T> future = this.ask(actorContext.getShardManager(), messageSupplier.apply(shardName),
+                                                  SHARD_MGR_TIMEOUT);
             shardResultData.add(new SimpleEntry<>(future,
                     new ShardResultBuilder().setShardName(shardName).setDataStoreType(dataStoreType)));
         }
@@ -593,10 +690,10 @@ public class ClusterAdminRpcService implements ClusterAdminService {
     }
 
     private static RpcResult<Void> newSuccessfulResult() {
-        return newSuccessfulResult((Void)null);
+        return newSuccessfulResult(null);
     }
 
     private static <T> RpcResult<T> newSuccessfulResult(T data) {
-        return RpcResultBuilder.<T>success(data).build();
+        return RpcResultBuilder.success(data).build();
     }
 }