package org.opendaylight.controller.cluster.datastore.admin;
import akka.actor.ActorRef;
+import akka.actor.Status.Success;
import akka.dispatch.OnComplete;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.io.FileOutputStream;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
}
@Override
- public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
- // TODO implement
- return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
- "Not implemented yet").buildFuture();
+ public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
+ final String shardName = input.getShardName();
+ if(Strings.isNullOrEmpty(shardName)) {
+ return newFailedRpcResultBuilder("A valid shard name must be specified").buildFuture();
+ }
+
+ LOG.info("Adding replica for shard {}", shardName);
+
+ final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
+ ListenableFuture<List<Success>> future = sendMessageToShardManagers(new AddShardReplica(shardName));
+ Futures.addCallback(future, new FutureCallback<List<Success>>() {
+ @Override
+ public void onSuccess(List<Success> snapshots) {
+ LOG.info("Successfully added replica for shard {}", shardName);
+ returnFuture.set(newSuccessfulResult());
+ }
+
+ @Override
+ public void onFailure(Throwable failure) {
+ onMessageFailure(String.format("Failed to add replica for shard %s", shardName),
+ returnFuture, failure);
+ }
+ });
+
+ return returnFuture;
}
@Override
"Not implemented yet").buildFuture();
}
- @SuppressWarnings("unchecked")
@Override
public Future<RpcResult<Void>> backupDatastore(final BackupDatastoreInput input) {
LOG.debug("backupDatastore: {}", input);
return newFailedRpcResultBuilder("A valid file path must be specified").buildFuture();
}
- Timeout timeout = new Timeout(1, TimeUnit.MINUTES);
- ListenableFuture<DatastoreSnapshot> configFuture = ask(configDataStore.getActorContext().getShardManager(),
- GetSnapshot.INSTANCE, timeout);
- ListenableFuture<DatastoreSnapshot> operFuture = ask(operDataStore.getActorContext().getShardManager(),
- GetSnapshot.INSTANCE, timeout);
-
final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
- Futures.addCallback(Futures.allAsList(configFuture, operFuture), new FutureCallback<List<DatastoreSnapshot>>() {
+ ListenableFuture<List<DatastoreSnapshot>> future = sendMessageToShardManagers(GetSnapshot.INSTANCE);
+ Futures.addCallback(future, new FutureCallback<List<DatastoreSnapshot>>() {
@Override
public void onSuccess(List<DatastoreSnapshot> snapshots) {
- saveSnapshotsToFile(new ArrayList<>(snapshots), input.getFilePath(), returnFuture);
+ saveSnapshotsToFile(new DatastoreSnapshotList(snapshots), input.getFilePath(), returnFuture);
}
@Override
public void onFailure(Throwable failure) {
- onDatastoreBackupFilure(input.getFilePath(), returnFuture, failure);
+ onDatastoreBackupFailure(input.getFilePath(), returnFuture, failure);
}
});
return returnFuture;
}
- private static void saveSnapshotsToFile(ArrayList<DatastoreSnapshot> snapshots, String fileName,
+ @SuppressWarnings("unchecked")
+ private <T> ListenableFuture<List<T>> sendMessageToShardManagers(Object message) {
+ Timeout timeout = new Timeout(1, TimeUnit.MINUTES);
+ ListenableFuture<T> configFuture = ask(configDataStore.getActorContext().getShardManager(), message, timeout);
+ ListenableFuture<T> operFuture = ask(operDataStore.getActorContext().getShardManager(), message, timeout);
+
+ return Futures.allAsList(configFuture, operFuture);
+ }
+
+ private static void saveSnapshotsToFile(DatastoreSnapshotList snapshots, String fileName,
SettableFuture<RpcResult<Void>> returnFuture) {
try(FileOutputStream fos = new FileOutputStream(fileName)) {
SerializationUtils.serialize(snapshots, fos);
returnFuture.set(newSuccessfulResult());
LOG.info("Successfully backed up datastore to file {}", fileName);
} catch(Exception e) {
- onDatastoreBackupFilure(fileName, returnFuture, e);
+ onDatastoreBackupFailure(fileName, returnFuture, e);
}
}
- private static void onDatastoreBackupFilure(String fileName, final SettableFuture<RpcResult<Void>> returnFuture,
+ private static void onDatastoreBackupFailure(String fileName, SettableFuture<RpcResult<Void>> returnFuture,
+ Throwable failure) {
+ onMessageFailure(String.format("Failed to back up datastore to file %s", fileName), returnFuture, failure);
+ }
+
+ private static void onMessageFailure(String msg, final SettableFuture<RpcResult<Void>> returnFuture,
Throwable failure) {
- String msg = String.format("Failed to back up datastore to file %s", fileName);
LOG.error(msg, failure);
- returnFuture.set(newFailedRpcResultBuilder(msg, failure).build());
+ returnFuture.set(newFailedRpcResultBuilder(String.format("%s: %s", msg, failure.getMessage())).build());
}
private <T> ListenableFuture<T> ask(ActorRef actor, Object message, Timeout timeout) {