package org.opendaylight.controller.cluster.datastore.admin;
import akka.actor.ActorRef;
+import akka.actor.Status.Success;
import akka.dispatch.OnComplete;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.io.FileOutputStream;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToNonvotingForAllShardsInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToVotingForAllShardsInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
}
@Override
- public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
- // TODO implement
- return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
- "Not implemented yet").buildFuture();
+ public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
+ final String shardName = input.getShardName();
+ if(Strings.isNullOrEmpty(shardName)) {
+ return newFailedRpcResultBuilder("A valid shard name must be specified").buildFuture();
+ }
+
+ DataStoreType dataStoreType = input.getDataStoreType();
+ if(dataStoreType == null) {
+ return newFailedRpcResultBuilder("A valid DataStoreType must be specified").buildFuture();
+ }
+
+ LOG.info("Adding replica for shard {}", shardName);
+
+ final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
+ ListenableFuture<Success> future = sendMessageToShardManager(dataStoreType, new AddShardReplica(shardName));
+ Futures.addCallback(future, new FutureCallback<Success>() {
+ @Override
+ public void onSuccess(Success success) {
+ LOG.info("Successfully added replica for shard {}", shardName);
+ returnFuture.set(newSuccessfulResult());
+ }
+
+ @Override
+ public void onFailure(Throwable failure) {
+ onMessageFailure(String.format("Failed to add replica for shard %s", shardName),
+ returnFuture, failure);
+ }
+ });
+
+ return returnFuture;
}
@Override
"Not implemented yet").buildFuture();
}
- @SuppressWarnings("unchecked")
@Override
public Future<RpcResult<Void>> backupDatastore(final BackupDatastoreInput input) {
LOG.debug("backupDatastore: {}", input);
return newFailedRpcResultBuilder("A valid file path must be specified").buildFuture();
}
- Timeout timeout = new Timeout(1, TimeUnit.MINUTES);
- ListenableFuture<DatastoreSnapshot> configFuture = ask(configDataStore.getActorContext().getShardManager(),
- GetSnapshot.INSTANCE, timeout);
- ListenableFuture<DatastoreSnapshot> operFuture = ask(operDataStore.getActorContext().getShardManager(),
- GetSnapshot.INSTANCE, timeout);
-
final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
- Futures.addCallback(Futures.allAsList(configFuture, operFuture), new FutureCallback<List<DatastoreSnapshot>>() {
+ ListenableFuture<List<DatastoreSnapshot>> future = sendMessageToShardManagers(GetSnapshot.INSTANCE);
+ Futures.addCallback(future, new FutureCallback<List<DatastoreSnapshot>>() {
@Override
public void onSuccess(List<DatastoreSnapshot> snapshots) {
- saveSnapshotsToFile(new ArrayList<>(snapshots), input.getFilePath(), returnFuture);
+ saveSnapshotsToFile(new DatastoreSnapshotList(snapshots), input.getFilePath(), returnFuture);
}
@Override
public void onFailure(Throwable failure) {
- onDatastoreBackupFilure(input.getFilePath(), returnFuture, failure);
+ onDatastoreBackupFailure(input.getFilePath(), returnFuture, failure);
}
});
return returnFuture;
}
- private static void saveSnapshotsToFile(ArrayList<DatastoreSnapshot> snapshots, String fileName,
+ @SuppressWarnings("unchecked")
+ private <T> ListenableFuture<List<T>> sendMessageToShardManagers(Object message) {
+ Timeout timeout = new Timeout(1, TimeUnit.MINUTES);
+ ListenableFuture<T> configFuture = ask(configDataStore.getActorContext().getShardManager(), message, timeout);
+ ListenableFuture<T> operFuture = ask(operDataStore.getActorContext().getShardManager(), message, timeout);
+
+ return Futures.allAsList(configFuture, operFuture);
+ }
+
+ private <T> ListenableFuture<T> sendMessageToShardManager(DataStoreType dataStoreType, Object message) {
+ ActorRef shardManager = dataStoreType == DataStoreType.Config ?
+ configDataStore.getActorContext().getShardManager() : operDataStore.getActorContext().getShardManager();
+ return ask(shardManager, message, new Timeout(1, TimeUnit.MINUTES));
+ }
+
+ private static void saveSnapshotsToFile(DatastoreSnapshotList snapshots, String fileName,
SettableFuture<RpcResult<Void>> returnFuture) {
try(FileOutputStream fos = new FileOutputStream(fileName)) {
SerializationUtils.serialize(snapshots, fos);
returnFuture.set(newSuccessfulResult());
LOG.info("Successfully backed up datastore to file {}", fileName);
} catch(Exception e) {
- onDatastoreBackupFilure(fileName, returnFuture, e);
+ onDatastoreBackupFailure(fileName, returnFuture, e);
}
}
- private static void onDatastoreBackupFilure(String fileName, final SettableFuture<RpcResult<Void>> returnFuture,
+ private static void onDatastoreBackupFailure(String fileName, SettableFuture<RpcResult<Void>> returnFuture,
+ Throwable failure) {
+ onMessageFailure(String.format("Failed to back up datastore to file %s", fileName), returnFuture, failure);
+ }
+
+ private static void onMessageFailure(String msg, final SettableFuture<RpcResult<Void>> returnFuture,
Throwable failure) {
- String msg = String.format("Failed to back up datastore to file %s", fileName);
LOG.error(msg, failure);
- returnFuture.set(newFailedRpcResultBuilder(msg, failure).build());
+ returnFuture.set(newFailedRpcResultBuilder(String.format("%s: %s", msg, failure.getMessage())).build());
}
private <T> ListenableFuture<T> ask(ActorRef actor, Object message, Timeout timeout) {