X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fadmin%2FClusterAdminRpcService.java;h=ba842961408ce84745f8f59a866098b378deede0;hp=c6aa0dca87228ad81e50d4c3860207d125dee996;hb=769ef0f950f2ed6cfc14d274e6a8edc583a36a96;hpb=d207a2f677545357095f2e5f145a5ecc8d3a60dd diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java index c6aa0dca87..ba84296140 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.datastore.admin; import akka.actor.ActorRef; +import akka.actor.Status.Success; import akka.dispatch.OnComplete; import akka.pattern.Patterns; import akka.util.Timeout; @@ -22,6 +23,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; +import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; @@ -70,10 +72,31 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl } @Override - public Future> addShardReplica(AddShardReplicaInput input) { - // TODO implement - return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, "operation-not-supported", - "Not implemented yet").buildFuture(); + public Future> addShardReplica(final AddShardReplicaInput input) { + final String shardName = input.getShardName(); + if(Strings.isNullOrEmpty(shardName)) { + return newFailedRpcResultBuilder("A valid shard name must be specified").buildFuture(); + } + + LOG.info("Adding replica for shard {}", shardName); + + final SettableFuture> returnFuture = SettableFuture.create(); + ListenableFuture> future = sendMessageToShardManagers(new AddShardReplica(shardName)); + Futures.addCallback(future, new FutureCallback>() { + @Override + public void onSuccess(List snapshots) { + LOG.info("Successfully added replica for shard {}", shardName); + returnFuture.set(newSuccessfulResult()); + } + + @Override + public void onFailure(Throwable failure) { + onMessageFailure(String.format("Failed to add replica for shard %s", shardName), + returnFuture, failure); + } + }); + + return returnFuture; } @Override @@ -112,7 +135,6 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl "Not implemented yet").buildFuture(); } - @SuppressWarnings("unchecked") @Override public Future> backupDatastore(final BackupDatastoreInput input) { LOG.debug("backupDatastore: {}", input); @@ -121,14 +143,9 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl return newFailedRpcResultBuilder("A valid file path must be specified").buildFuture(); } - Timeout timeout = new Timeout(1, TimeUnit.MINUTES); - ListenableFuture configFuture = ask(configDataStore.getActorContext().getShardManager(), - GetSnapshot.INSTANCE, timeout); - ListenableFuture operFuture = ask(operDataStore.getActorContext().getShardManager(), - GetSnapshot.INSTANCE, timeout); - final SettableFuture> returnFuture = SettableFuture.create(); - Futures.addCallback(Futures.allAsList(configFuture, operFuture), new FutureCallback>() { + ListenableFuture> future = sendMessageToShardManagers(GetSnapshot.INSTANCE); + Futures.addCallback(future, new FutureCallback>() { @Override public void onSuccess(List snapshots) { saveSnapshotsToFile(new DatastoreSnapshotList(snapshots), input.getFilePath(), returnFuture); @@ -136,13 +153,22 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl @Override public void onFailure(Throwable failure) { - onDatastoreBackupFilure(input.getFilePath(), returnFuture, failure); + onDatastoreBackupFailure(input.getFilePath(), returnFuture, failure); } }); return returnFuture; } + @SuppressWarnings("unchecked") + private ListenableFuture> sendMessageToShardManagers(Object message) { + Timeout timeout = new Timeout(1, TimeUnit.MINUTES); + ListenableFuture configFuture = ask(configDataStore.getActorContext().getShardManager(), message, timeout); + ListenableFuture operFuture = ask(operDataStore.getActorContext().getShardManager(), message, timeout); + + return Futures.allAsList(configFuture, operFuture); + } + private static void saveSnapshotsToFile(DatastoreSnapshotList snapshots, String fileName, SettableFuture> returnFuture) { try(FileOutputStream fos = new FileOutputStream(fileName)) { @@ -151,15 +177,19 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl returnFuture.set(newSuccessfulResult()); LOG.info("Successfully backed up datastore to file {}", fileName); } catch(Exception e) { - onDatastoreBackupFilure(fileName, returnFuture, e); + onDatastoreBackupFailure(fileName, returnFuture, e); } } - private static void onDatastoreBackupFilure(String fileName, final SettableFuture> returnFuture, + private static void onDatastoreBackupFailure(String fileName, SettableFuture> returnFuture, + Throwable failure) { + onMessageFailure(String.format("Failed to back up datastore to file %s", fileName), returnFuture, failure); + } + + private static void onMessageFailure(String msg, final SettableFuture> returnFuture, Throwable failure) { - String msg = String.format("Failed to back up datastore to file %s", fileName); LOG.error(msg, failure); - returnFuture.set(newFailedRpcResultBuilder(msg, failure).build()); + returnFuture.set(newFailedRpcResultBuilder(String.format("%s: %s", msg, failure.getMessage())).build()); } private ListenableFuture ask(ActorRef actor, Object message, Timeout timeout) {