X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fadmin%2FClusterAdminRpcService.java;h=976443fe20da2f9ec66716733dc8f44520c400ac;hb=e65e1755bd770cf03d3ea15edda9b9cc7a79f3c0;hp=a386c350bb7d5b6d7da5477e1677556477f5512f;hpb=92edfd0e7e15de0f3b8ad089c45ea91812fae867;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java index a386c350bb..976443fe20 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java @@ -8,31 +8,45 @@ package org.opendaylight.controller.cluster.datastore.admin; import akka.actor.ActorRef; +import akka.actor.Status.Success; import akka.dispatch.OnComplete; import akka.pattern.Patterns; import akka.util.Timeout; +import com.google.common.base.Function; import com.google.common.base.Strings; +import com.google.common.base.Throwables; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.io.FileOutputStream; +import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; import java.util.List; +import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; +import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; +import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToNonvotingForAllShardsInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToVotingForAllShardsInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResult; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResultBuilder; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; @@ -45,6 +59,8 @@ import org.slf4j.LoggerFactory; * @author Thomas Pantelis */ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseable { + private static final Timeout SHARD_MGR_TIMEOUT = new Timeout(1, TimeUnit.MINUTES); + private static final Logger LOG = LoggerFactory.getLogger(ClusterAdminRpcService.class); private final DistributedDataStore configDataStore; @@ -70,10 +86,36 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl } @Override - public Future> addShardReplica(AddShardReplicaInput input) { - // TODO implement - return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, "operation-not-supported", - "Not implemented yet").buildFuture(); + public Future> addShardReplica(final AddShardReplicaInput input) { + final String shardName = input.getShardName(); + if(Strings.isNullOrEmpty(shardName)) { + return newFailedRpcResultBuilder("A valid shard name must be specified").buildFuture(); + } + + DataStoreType dataStoreType = input.getDataStoreType(); + if(dataStoreType == null) { + return newFailedRpcResultBuilder("A valid DataStoreType must be specified").buildFuture(); + } + + LOG.info("Adding replica for shard {}", shardName); + + final SettableFuture> returnFuture = SettableFuture.create(); + ListenableFuture future = sendMessageToShardManager(dataStoreType, new AddShardReplica(shardName)); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Success success) { + LOG.info("Successfully added replica for shard {}", shardName); + returnFuture.set(newSuccessfulResult()); + } + + @Override + public void onFailure(Throwable failure) { + onMessageFailure(String.format("Failed to add replica for shard %s", shardName), + returnFuture, failure); + } + }); + + return returnFuture; } @Override @@ -84,12 +126,29 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl } @Override - public Future> addReplicasForAllShards() { - // TODO implement - return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, "operation-not-supported", - "Not implemented yet").buildFuture(); + public Future> addReplicasForAllShards() { + LOG.info("Adding replicas for all shards"); + + final List, ShardResultBuilder>> shardResultData = new ArrayList<>(); + Function messageSupplier = new Function() { + @Override + public Object apply(String shardName) { + return new AddShardReplica(shardName); + } + }; + + sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier); + sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier); + + return waitForShardResults(shardResultData, new Function, AddReplicasForAllShardsOutput>() { + @Override + public AddReplicasForAllShardsOutput apply(List shardResults) { + return new AddReplicasForAllShardsOutputBuilder().setShardResult(shardResults).build(); + } + }, "Failed to add replica"); } + @Override public Future> removeAllShardReplicas() { // TODO implement @@ -112,7 +171,6 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl "Not implemented yet").buildFuture(); } - @SuppressWarnings("unchecked") @Override public Future> backupDatastore(final BackupDatastoreInput input) { LOG.debug("backupDatastore: {}", input); @@ -121,29 +179,98 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl return newFailedRpcResultBuilder("A valid file path must be specified").buildFuture(); } - Timeout timeout = new Timeout(1, TimeUnit.MINUTES); - ListenableFuture configFuture = ask(configDataStore.getActorContext().getShardManager(), - GetSnapshot.INSTANCE, timeout); - ListenableFuture operFuture = ask(operDataStore.getActorContext().getShardManager(), - GetSnapshot.INSTANCE, timeout); - final SettableFuture> returnFuture = SettableFuture.create(); - Futures.addCallback(Futures.allAsList(configFuture, operFuture), new FutureCallback>() { + ListenableFuture> future = sendMessageToShardManagers(GetSnapshot.INSTANCE); + Futures.addCallback(future, new FutureCallback>() { @Override public void onSuccess(List snapshots) { - saveSnapshotsToFile(new ArrayList<>(snapshots), input.getFilePath(), returnFuture); + saveSnapshotsToFile(new DatastoreSnapshotList(snapshots), input.getFilePath(), returnFuture); } @Override public void onFailure(Throwable failure) { - onDatastoreBackupFilure(input.getFilePath(), returnFuture, failure); + onDatastoreBackupFailure(input.getFilePath(), returnFuture, failure); } }); return returnFuture; } - private static void saveSnapshotsToFile(ArrayList snapshots, String fileName, + private static SettableFuture> waitForShardResults( + final List, ShardResultBuilder>> shardResultData, + final Function, T> resultDataSupplier, + final String failureLogMsgPrefix) { + final SettableFuture> returnFuture = SettableFuture.create(); + final List shardResults = new ArrayList<>(); + for(final Entry, ShardResultBuilder> entry: shardResultData) { + Futures.addCallback(entry.getKey(), new FutureCallback() { + @Override + public void onSuccess(Success result) { + synchronized(shardResults) { + ShardResultBuilder shardResult = entry.getValue(); + LOG.debug("onSuccess for shard {}, type {}", shardResult.getShardName(), + shardResult.getDataStoreType()); + shardResults.add(shardResult.setSucceeded(true).build()); + checkIfComplete(); + } + } + + @Override + public void onFailure(Throwable t) { + synchronized(shardResults) { + ShardResultBuilder shardResult = entry.getValue(); + LOG.warn("{} for shard {}, type {}", failureLogMsgPrefix, shardResult.getShardName(), + shardResult.getDataStoreType(), t); + shardResults.add(shardResult.setSucceeded(false).setErrorMessage( + Throwables.getRootCause(t).getMessage()).build()); + checkIfComplete(); + } + } + + void checkIfComplete() { + LOG.debug("checkIfComplete: expected {}, actual {}", shardResultData.size(), shardResults.size()); + if(shardResults.size() == shardResultData.size()) { + returnFuture.set(newSuccessfulResult(resultDataSupplier.apply(shardResults))); + } + } + }); + } + return returnFuture; + } + + private void sendMessageToManagerForConfiguredShards(DataStoreType dataStoreType, + List, ShardResultBuilder>> shardResultData, + Function messageSupplier) { + ActorContext actorContext = dataStoreType == DataStoreType.Config ? + configDataStore.getActorContext() : operDataStore.getActorContext(); + Set allShardNames = actorContext.getConfiguration().getAllShardNames(); + + LOG.debug("Sending message to all shards {} for data store {}", allShardNames, actorContext.getDataStoreType()); + + for(String shardName: allShardNames) { + ListenableFuture future = this.ask(actorContext.getShardManager(), messageSupplier.apply(shardName), + SHARD_MGR_TIMEOUT); + shardResultData.add(new SimpleEntry, ShardResultBuilder>(future, + new ShardResultBuilder().setShardName(shardName).setDataStoreType(dataStoreType))); + } + } + + @SuppressWarnings("unchecked") + private ListenableFuture> sendMessageToShardManagers(Object message) { + Timeout timeout = SHARD_MGR_TIMEOUT; + ListenableFuture configFuture = ask(configDataStore.getActorContext().getShardManager(), message, timeout); + ListenableFuture operFuture = ask(operDataStore.getActorContext().getShardManager(), message, timeout); + + return Futures.allAsList(configFuture, operFuture); + } + + private ListenableFuture sendMessageToShardManager(DataStoreType dataStoreType, Object message) { + ActorRef shardManager = dataStoreType == DataStoreType.Config ? + configDataStore.getActorContext().getShardManager() : operDataStore.getActorContext().getShardManager(); + return ask(shardManager, message, SHARD_MGR_TIMEOUT); + } + + private static void saveSnapshotsToFile(DatastoreSnapshotList snapshots, String fileName, SettableFuture> returnFuture) { try(FileOutputStream fos = new FileOutputStream(fileName)) { SerializationUtils.serialize(snapshots, fos); @@ -151,15 +278,19 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl returnFuture.set(newSuccessfulResult()); LOG.info("Successfully backed up datastore to file {}", fileName); } catch(Exception e) { - onDatastoreBackupFilure(fileName, returnFuture, e); + onDatastoreBackupFailure(fileName, returnFuture, e); } } - private static void onDatastoreBackupFilure(String fileName, final SettableFuture> returnFuture, + private static void onDatastoreBackupFailure(String fileName, SettableFuture> returnFuture, + Throwable failure) { + onMessageFailure(String.format("Failed to back up datastore to file %s", fileName), returnFuture, failure); + } + + private static void onMessageFailure(String msg, final SettableFuture> returnFuture, Throwable failure) { - String msg = String.format("Failed to back up datastore to file %s", fileName); LOG.error(msg, failure); - returnFuture.set(newFailedRpcResultBuilder(msg, failure).build()); + returnFuture.set(newFailedRpcResultBuilder(String.format("%s: %s", msg, failure.getMessage())).build()); } private ListenableFuture ask(ActorRef actor, Object message, Timeout timeout) { @@ -190,6 +321,10 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl } private static RpcResult newSuccessfulResult() { - return RpcResultBuilder.success().build(); + return newSuccessfulResult((Void)null); + } + + private static RpcResult newSuccessfulResult(T data) { + return RpcResultBuilder.success(data).build(); } }