Bug 2187: Implement add-shard-replica RPC
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcService.java
index c6aa0dc..ba84296 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.datastore.admin;
 
 import akka.actor.ActorRef;
+import akka.actor.Status.Success;
 import akka.dispatch.OnComplete;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
@@ -22,6 +23,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
@@ -70,10 +72,31 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
     }
 
     @Override
-    public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
-        // TODO implement
-        return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
-                "Not implemented yet").buildFuture();
+    public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
+        final String shardName = input.getShardName();
+        if(Strings.isNullOrEmpty(shardName)) {
+            return newFailedRpcResultBuilder("A valid shard name must be specified").buildFuture();
+        }
+
+        LOG.info("Adding replica for shard {}", shardName);
+
+        final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
+        ListenableFuture<List<Success>> future = sendMessageToShardManagers(new AddShardReplica(shardName));
+        Futures.addCallback(future, new FutureCallback<List<Success>>() {
+            @Override
+            public void onSuccess(List<Success> snapshots) {
+                LOG.info("Successfully added replica for shard {}", shardName);
+                returnFuture.set(newSuccessfulResult());
+            }
+
+            @Override
+            public void onFailure(Throwable failure) {
+                onMessageFailure(String.format("Failed to add replica for shard %s", shardName),
+                        returnFuture, failure);
+            }
+        });
+
+        return returnFuture;
     }
 
     @Override
@@ -112,7 +135,6 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
                 "Not implemented yet").buildFuture();
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public Future<RpcResult<Void>> backupDatastore(final BackupDatastoreInput input) {
         LOG.debug("backupDatastore: {}", input);
@@ -121,14 +143,9 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
             return newFailedRpcResultBuilder("A valid file path must be specified").buildFuture();
         }
 
-        Timeout timeout = new Timeout(1, TimeUnit.MINUTES);
-        ListenableFuture<DatastoreSnapshot> configFuture = ask(configDataStore.getActorContext().getShardManager(),
-                GetSnapshot.INSTANCE, timeout);
-        ListenableFuture<DatastoreSnapshot> operFuture = ask(operDataStore.getActorContext().getShardManager(),
-                GetSnapshot.INSTANCE, timeout);
-
         final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
-        Futures.addCallback(Futures.allAsList(configFuture, operFuture), new FutureCallback<List<DatastoreSnapshot>>() {
+        ListenableFuture<List<DatastoreSnapshot>> future = sendMessageToShardManagers(GetSnapshot.INSTANCE);
+        Futures.addCallback(future, new FutureCallback<List<DatastoreSnapshot>>() {
             @Override
             public void onSuccess(List<DatastoreSnapshot> snapshots) {
                 saveSnapshotsToFile(new DatastoreSnapshotList(snapshots), input.getFilePath(), returnFuture);
@@ -136,13 +153,22 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
 
             @Override
             public void onFailure(Throwable failure) {
-                onDatastoreBackupFilure(input.getFilePath(), returnFuture, failure);
+                onDatastoreBackupFailure(input.getFilePath(), returnFuture, failure);
             }
         });
 
         return returnFuture;
     }
 
+    @SuppressWarnings("unchecked")
+    private <T> ListenableFuture<List<T>> sendMessageToShardManagers(Object message) {
+        Timeout timeout = new Timeout(1, TimeUnit.MINUTES);
+        ListenableFuture<T> configFuture = ask(configDataStore.getActorContext().getShardManager(), message, timeout);
+        ListenableFuture<T> operFuture = ask(operDataStore.getActorContext().getShardManager(), message, timeout);
+
+        return Futures.allAsList(configFuture, operFuture);
+    }
+
     private static void saveSnapshotsToFile(DatastoreSnapshotList snapshots, String fileName,
             SettableFuture<RpcResult<Void>> returnFuture) {
         try(FileOutputStream fos = new FileOutputStream(fileName)) {
@@ -151,15 +177,19 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
             returnFuture.set(newSuccessfulResult());
             LOG.info("Successfully backed up datastore to file {}", fileName);
         } catch(Exception e) {
-            onDatastoreBackupFilure(fileName, returnFuture, e);
+            onDatastoreBackupFailure(fileName, returnFuture, e);
         }
     }
 
-    private static void onDatastoreBackupFilure(String fileName, final SettableFuture<RpcResult<Void>> returnFuture,
+    private static void onDatastoreBackupFailure(String fileName, SettableFuture<RpcResult<Void>> returnFuture,
+            Throwable failure) {
+        onMessageFailure(String.format("Failed to back up datastore to file %s", fileName), returnFuture, failure);
+    }
+
+    private static void onMessageFailure(String msg, final SettableFuture<RpcResult<Void>> returnFuture,
             Throwable failure) {
-        String msg = String.format("Failed to back up datastore to file %s", fileName);
         LOG.error(msg, failure);
-        returnFuture.set(newFailedRpcResultBuilder(msg, failure).build());
+        returnFuture.set(newFailedRpcResultBuilder(String.format("%s: %s", msg, failure.getMessage())).build());
     }
 
     private <T> ListenableFuture<T> ask(ActorRef actor, Object message, Timeout timeout) {