import akka.dispatch.OnComplete;
import akka.pattern.Patterns;
import akka.util.Timeout;
-import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.AbstractMap.SimpleEntry;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshotList;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingState;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
private final BindingNormalizedNodeSerializer serializer;
private final Timeout makeLeaderLocalTimeout;
- public ClusterAdminRpcService(DistributedDataStoreInterface configDataStore,
- DistributedDataStoreInterface operDataStore,
- BindingNormalizedNodeSerializer serializer) {
+ public ClusterAdminRpcService(final DistributedDataStoreInterface configDataStore,
+ final DistributedDataStoreInterface operDataStore,
+ final BindingNormalizedNodeSerializer serializer) {
this.configDataStore = configDataStore;
this.operDataStore = operDataStore;
this.serializer = serializer;
this.makeLeaderLocalTimeout =
- new Timeout(configDataStore.getActorContext().getDatastoreContext()
+ new Timeout(configDataStore.getActorUtils().getDatastoreContext()
.getShardLeaderElectionTimeout().duration().$times(2));
}
@Override
- public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
+ public ListenableFuture<RpcResult<AddShardReplicaOutput>> addShardReplica(final AddShardReplicaInput input) {
final String shardName = input.getShardName();
if (Strings.isNullOrEmpty(shardName)) {
return newFailedRpcResultFuture("A valid shard name must be specified");
LOG.info("Adding replica for shard {}", shardName);
- final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
+ final SettableFuture<RpcResult<AddShardReplicaOutput>> returnFuture = SettableFuture.create();
ListenableFuture<Success> future = sendMessageToShardManager(dataStoreType, new AddShardReplica(shardName));
Futures.addCallback(future, new FutureCallback<Success>() {
@Override
- public void onSuccess(Success success) {
+ public void onSuccess(final Success success) {
LOG.info("Successfully added replica for shard {}", shardName);
- returnFuture.set(newSuccessfulResult());
+ returnFuture.set(newSuccessfulResult(new AddShardReplicaOutputBuilder().build()));
}
@Override
- public void onFailure(Throwable failure) {
+ public void onFailure(final Throwable failure) {
onMessageFailure(String.format("Failed to add replica for shard %s", shardName),
returnFuture, failure);
}
- });
+ }, MoreExecutors.directExecutor());
return returnFuture;
}
@Override
- public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
+ public ListenableFuture<RpcResult<RemoveShardReplicaOutput>> removeShardReplica(
+ final RemoveShardReplicaInput input) {
final String shardName = input.getShardName();
if (Strings.isNullOrEmpty(shardName)) {
return newFailedRpcResultFuture("A valid shard name must be specified");
LOG.info("Removing replica for shard {} memberName {}, datastoreType {}", shardName, memberName, dataStoreType);
- final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
+ final SettableFuture<RpcResult<RemoveShardReplicaOutput>> returnFuture = SettableFuture.create();
ListenableFuture<Success> future = sendMessageToShardManager(dataStoreType,
new RemoveShardReplica(shardName, MemberName.forName(memberName)));
Futures.addCallback(future, new FutureCallback<Success>() {
@Override
- public void onSuccess(Success success) {
+ public void onSuccess(final Success success) {
LOG.info("Successfully removed replica for shard {}", shardName);
- returnFuture.set(newSuccessfulResult());
+ returnFuture.set(newSuccessfulResult(new RemoveShardReplicaOutputBuilder().build()));
}
@Override
- public void onFailure(Throwable failure) {
+ public void onFailure(final Throwable failure) {
onMessageFailure(String.format("Failed to remove replica for shard %s", shardName),
returnFuture, failure);
}
- });
+ }, MoreExecutors.directExecutor());
return returnFuture;
}
@Override
- public Future<RpcResult<Void>> makeLeaderLocal(final MakeLeaderLocalInput input) {
+ public ListenableFuture<RpcResult<MakeLeaderLocalOutput>> makeLeaderLocal(final MakeLeaderLocalInput input) {
final String shardName = input.getShardName();
if (Strings.isNullOrEmpty(shardName)) {
return newFailedRpcResultFuture("A valid shard name must be specified");
return newFailedRpcResultFuture("A valid DataStoreType must be specified");
}
- ActorContext actorContext = dataStoreType == DataStoreType.Config
- ? configDataStore.getActorContext()
- : operDataStore.getActorContext();
+ ActorUtils actorUtils = dataStoreType == DataStoreType.Config
+ ? configDataStore.getActorUtils() : operDataStore.getActorUtils();
LOG.info("Moving leader to local node {} for shard {}, datastoreType {}",
- actorContext.getCurrentMemberName().getName(), shardName, dataStoreType);
+ actorUtils.getCurrentMemberName().getName(), shardName, dataStoreType);
final scala.concurrent.Future<ActorRef> localShardReply =
- actorContext.findLocalShardAsync(shardName);
+ actorUtils.findLocalShardAsync(shardName);
final scala.concurrent.Promise<Object> makeLeaderLocalAsk = akka.dispatch.Futures.promise();
localShardReply.onComplete(new OnComplete<ActorRef>() {
@Override
- public void onComplete(final Throwable failure, final ActorRef actorRef) throws Throwable {
+ public void onComplete(final Throwable failure, final ActorRef actorRef) {
if (failure != null) {
LOG.warn("No local shard found for {} datastoreType {} - Cannot request leadership transfer to"
- + " local shard.", shardName, failure);
+ + " local shard.", shardName, dataStoreType, failure);
makeLeaderLocalAsk.failure(failure);
} else {
makeLeaderLocalAsk
- .completeWith(actorContext
+ .completeWith(actorUtils
.executeOperationAsync(actorRef, MakeLeaderLocal.INSTANCE, makeLeaderLocalTimeout));
}
}
- }, actorContext.getClientDispatcher());
+ }, actorUtils.getClientDispatcher());
- final SettableFuture<RpcResult<Void>> future = SettableFuture.create();
+ final SettableFuture<RpcResult<MakeLeaderLocalOutput>> future = SettableFuture.create();
makeLeaderLocalAsk.future().onComplete(new OnComplete<Object>() {
@Override
- public void onComplete(final Throwable failure, final Object success) throws Throwable {
+ public void onComplete(final Throwable failure, final Object success) {
if (failure != null) {
LOG.error("Leadership transfer failed for shard {}.", shardName, failure);
- future.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION,
+ future.set(RpcResultBuilder.<MakeLeaderLocalOutput>failed().withError(ErrorType.APPLICATION,
"leadership transfer failed", failure).build());
return;
}
LOG.debug("Leadership transfer complete");
- future.set(RpcResultBuilder.<Void>success().build());
+ future.set(RpcResultBuilder.success(new MakeLeaderLocalOutputBuilder().build()).build());
}
- }, actorContext.getClientDispatcher());
+ }, actorUtils.getClientDispatcher());
return future;
}
@Override
- public Future<RpcResult<Void>> addPrefixShardReplica(final AddPrefixShardReplicaInput input) {
+ public ListenableFuture<RpcResult<AddPrefixShardReplicaOutput>> addPrefixShardReplica(
+ final AddPrefixShardReplicaInput input) {
final InstanceIdentifier<?> identifier = input.getShardPrefix();
if (identifier == null) {
LOG.info("Adding replica for shard {}, datastore type {}", identifier, dataStoreType);
final YangInstanceIdentifier prefix = serializer.toYangInstanceIdentifier(identifier);
- final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
+ final SettableFuture<RpcResult<AddPrefixShardReplicaOutput>> returnFuture = SettableFuture.create();
ListenableFuture<Success> future = sendMessageToShardManager(dataStoreType, new AddPrefixShardReplica(prefix));
Futures.addCallback(future, new FutureCallback<Success>() {
@Override
- public void onSuccess(Success success) {
+ public void onSuccess(final Success success) {
LOG.info("Successfully added replica for shard {}", prefix);
- returnFuture.set(newSuccessfulResult());
+ returnFuture.set(newSuccessfulResult(new AddPrefixShardReplicaOutputBuilder().build()));
}
@Override
- public void onFailure(Throwable failure) {
+ public void onFailure(final Throwable failure) {
onMessageFailure(String.format("Failed to add replica for shard %s", prefix),
returnFuture, failure);
}
- });
+ }, MoreExecutors.directExecutor());
return returnFuture;
}
@Override
- public Future<RpcResult<Void>> removePrefixShardReplica(final RemovePrefixShardReplicaInput input) {
+ public ListenableFuture<RpcResult<RemovePrefixShardReplicaOutput>> removePrefixShardReplica(
+ final RemovePrefixShardReplicaInput input) {
final InstanceIdentifier<?> identifier = input.getShardPrefix();
if (identifier == null) {
identifier, memberName, dataStoreType);
final YangInstanceIdentifier prefix = serializer.toYangInstanceIdentifier(identifier);
- final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
+ final SettableFuture<RpcResult<RemovePrefixShardReplicaOutput>> returnFuture = SettableFuture.create();
final ListenableFuture<Success> future = sendMessageToShardManager(dataStoreType,
new RemovePrefixShardReplica(prefix, MemberName.forName(memberName)));
Futures.addCallback(future, new FutureCallback<Success>() {
@Override
public void onSuccess(final Success success) {
LOG.info("Successfully removed replica for shard {}", prefix);
- returnFuture.set(newSuccessfulResult());
+ returnFuture.set(newSuccessfulResult(new RemovePrefixShardReplicaOutputBuilder().build()));
}
@Override
onMessageFailure(String.format("Failed to remove replica for shard %s", prefix),
returnFuture, failure);
}
- });
+ }, MoreExecutors.directExecutor());
return returnFuture;
}
@Override
- public Future<RpcResult<AddReplicasForAllShardsOutput>> addReplicasForAllShards() {
+ public ListenableFuture<RpcResult<AddReplicasForAllShardsOutput>> addReplicasForAllShards(
+ final AddReplicasForAllShardsInput input) {
LOG.info("Adding replicas for all shards");
final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
- Function<String, Object> messageSupplier = shardName -> new AddShardReplica(shardName);
- sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
- sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
+ sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, AddShardReplica::new);
+ sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, AddShardReplica::new);
return waitForShardResults(shardResultData, shardResults ->
new AddReplicasForAllShardsOutputBuilder().setShardResult(shardResults).build(),
@Override
- public Future<RpcResult<RemoveAllShardReplicasOutput>> removeAllShardReplicas(RemoveAllShardReplicasInput input) {
+ public ListenableFuture<RpcResult<RemoveAllShardReplicasOutput>> removeAllShardReplicas(
+ final RemoveAllShardReplicasInput input) {
LOG.info("Removing replicas for all shards");
final String memberName = input.getMemberName();
}
@Override
- public Future<RpcResult<Void>> changeMemberVotingStatesForShard(ChangeMemberVotingStatesForShardInput input) {
+ public ListenableFuture<RpcResult<ChangeMemberVotingStatesForShardOutput>> changeMemberVotingStatesForShard(
+ final ChangeMemberVotingStatesForShardInput input) {
final String shardName = input.getShardName();
if (Strings.isNullOrEmpty(shardName)) {
return newFailedRpcResultFuture("A valid shard name must be specified");
LOG.info("Change member voting states for shard {}: {}", shardName,
changeVotingStatus.getMeberVotingStatusMap());
- final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
+ final SettableFuture<RpcResult<ChangeMemberVotingStatesForShardOutput>> returnFuture = SettableFuture.create();
ListenableFuture<Success> future = sendMessageToShardManager(dataStoreType, changeVotingStatus);
Futures.addCallback(future, new FutureCallback<Success>() {
@Override
- public void onSuccess(Success success) {
+ public void onSuccess(final Success success) {
LOG.info("Successfully changed member voting states for shard {}", shardName);
- returnFuture.set(newSuccessfulResult());
+ returnFuture.set(newSuccessfulResult(new ChangeMemberVotingStatesForShardOutputBuilder().build()));
}
@Override
- public void onFailure(Throwable failure) {
+ public void onFailure(final Throwable failure) {
onMessageFailure(String.format("Failed to change member voting states for shard %s", shardName),
returnFuture, failure);
}
- });
+ }, MoreExecutors.directExecutor());
return returnFuture;
}
@Override
- public Future<RpcResult<ChangeMemberVotingStatesForAllShardsOutput>> changeMemberVotingStatesForAllShards(
+ public ListenableFuture<RpcResult<ChangeMemberVotingStatesForAllShardsOutput>> changeMemberVotingStatesForAllShards(
final ChangeMemberVotingStatesForAllShardsInput input) {
List<MemberVotingState> memberVotingStates = input.getMemberVotingState();
if (memberVotingStates == null || memberVotingStates.isEmpty()) {
}
@Override
- public Future<RpcResult<FlipMemberVotingStatesForAllShardsOutput>> flipMemberVotingStatesForAllShards() {
+ public ListenableFuture<RpcResult<FlipMemberVotingStatesForAllShardsOutput>> flipMemberVotingStatesForAllShards(
+ final FlipMemberVotingStatesForAllShardsInput input) {
final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
- Function<String, Object> messageSupplier = shardName ->
- new FlipShardMembersVotingStatus(shardName);
+ Function<String, Object> messageSupplier = FlipShardMembersVotingStatus::new;
LOG.info("Flip member voting states for all shards");
}
@Override
- public Future<RpcResult<GetShardRoleOutput>> getShardRole(final GetShardRoleInput input) {
+ public ListenableFuture<RpcResult<GetShardRoleOutput>> getShardRole(final GetShardRoleInput input) {
final String shardName = input.getShardName();
if (Strings.isNullOrEmpty(shardName)) {
return newFailedRpcResultFuture("A valid shard name must be specified");
returnFuture.set(ClusterAdminRpcService.<GetShardRoleOutput>newFailedRpcResultBuilder(
"Failed to get shard role.", failure).build());
}
- });
+ }, MoreExecutors.directExecutor());
return returnFuture;
}
@Override
- public Future<RpcResult<GetPrefixShardRoleOutput>> getPrefixShardRole(final GetPrefixShardRoleInput input) {
+ public ListenableFuture<RpcResult<GetPrefixShardRoleOutput>> getPrefixShardRole(
+ final GetPrefixShardRoleInput input) {
final InstanceIdentifier<?> identifier = input.getShardPrefix();
if (identifier == null) {
return newFailedRpcResultFuture("A valid shard identifier must be specified");
returnFuture.set(ClusterAdminRpcService.<GetPrefixShardRoleOutput>newFailedRpcResultBuilder(
"Failed to get shard role.", failure).build());
}
- });
+ }, MoreExecutors.directExecutor());
return returnFuture;
}
@Override
- public Future<RpcResult<Void>> backupDatastore(final BackupDatastoreInput input) {
+ public ListenableFuture<RpcResult<BackupDatastoreOutput>> backupDatastore(final BackupDatastoreInput input) {
LOG.debug("backupDatastore: {}", input);
if (Strings.isNullOrEmpty(input.getFilePath())) {
return newFailedRpcResultFuture("A valid file path must be specified");
}
- final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
+ final SettableFuture<RpcResult<BackupDatastoreOutput>> returnFuture = SettableFuture.create();
ListenableFuture<List<DatastoreSnapshot>> future = sendMessageToShardManagers(GetSnapshot.INSTANCE);
Futures.addCallback(future, new FutureCallback<List<DatastoreSnapshot>>() {
@Override
- public void onSuccess(List<DatastoreSnapshot> snapshots) {
+ public void onSuccess(final List<DatastoreSnapshot> snapshots) {
saveSnapshotsToFile(new DatastoreSnapshotList(snapshots), input.getFilePath(), returnFuture);
}
@Override
- public void onFailure(Throwable failure) {
+ public void onFailure(final Throwable failure) {
onDatastoreBackupFailure(input.getFilePath(), returnFuture, failure);
}
- });
+ }, MoreExecutors.directExecutor());
return returnFuture;
}
- private ChangeShardMembersVotingStatus toChangeShardMembersVotingStatus(final String shardName,
- List<MemberVotingState> memberVotingStatus) {
+ private static ChangeShardMembersVotingStatus toChangeShardMembersVotingStatus(final String shardName,
+ final List<MemberVotingState> memberVotingStatus) {
Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
for (MemberVotingState memberStatus: memberVotingStatus) {
serverVotingStatusMap.put(memberStatus.getMemberName(), memberStatus.isVoting());
}
-
- ChangeShardMembersVotingStatus changeVotingStatus = new ChangeShardMembersVotingStatus(shardName,
- serverVotingStatusMap);
- return changeVotingStatus;
+ return new ChangeShardMembersVotingStatus(shardName, serverVotingStatusMap);
}
private static <T> SettableFuture<RpcResult<T>> waitForShardResults(
for (final Entry<ListenableFuture<Success>, ShardResultBuilder> entry : shardResultData) {
Futures.addCallback(entry.getKey(), new FutureCallback<Success>() {
@Override
- public void onSuccess(Success result) {
+ public void onSuccess(final Success result) {
synchronized (shardResults) {
ShardResultBuilder shardResult = entry.getValue();
LOG.debug("onSuccess for shard {}, type {}", shardResult.getShardName(),
}
@Override
- public void onFailure(Throwable failure) {
+ public void onFailure(final Throwable failure) {
synchronized (shardResults) {
ShardResultBuilder shardResult = entry.getValue();
LOG.warn("{} for shard {}, type {}", failureLogMsgPrefix, shardResult.getShardName(),
returnFuture.set(newSuccessfulResult(resultDataSupplier.apply(shardResults)));
}
}
- });
+ }, MoreExecutors.directExecutor());
}
return returnFuture;
}
- private <T> void sendMessageToManagerForConfiguredShards(DataStoreType dataStoreType,
- List<Entry<ListenableFuture<T>, ShardResultBuilder>> shardResultData,
- Function<String, Object> messageSupplier) {
- ActorContext actorContext = dataStoreType == DataStoreType.Config ? configDataStore.getActorContext()
- : operDataStore.getActorContext();
- Set<String> allShardNames = actorContext.getConfiguration().getAllShardNames();
+ private <T> void sendMessageToManagerForConfiguredShards(final DataStoreType dataStoreType,
+ final List<Entry<ListenableFuture<T>, ShardResultBuilder>> shardResultData,
+ final Function<String, Object> messageSupplier) {
+ ActorUtils actorUtils = dataStoreType == DataStoreType.Config ? configDataStore.getActorUtils()
+ : operDataStore.getActorUtils();
+ Set<String> allShardNames = actorUtils.getConfiguration().getAllShardNames();
- LOG.debug("Sending message to all shards {} for data store {}", allShardNames, actorContext.getDataStoreName());
+ LOG.debug("Sending message to all shards {} for data store {}", allShardNames, actorUtils.getDataStoreName());
for (String shardName: allShardNames) {
- ListenableFuture<T> future = this.<T>ask(actorContext.getShardManager(), messageSupplier.apply(shardName),
- SHARD_MGR_TIMEOUT);
+ ListenableFuture<T> future = this.ask(actorUtils.getShardManager(), messageSupplier.apply(shardName),
+ SHARD_MGR_TIMEOUT);
shardResultData.add(new SimpleEntry<>(future,
new ShardResultBuilder().setShardName(shardName).setDataStoreType(dataStoreType)));
}
}
- @SuppressWarnings("unchecked")
- private <T> ListenableFuture<List<T>> sendMessageToShardManagers(Object message) {
+ private <T> ListenableFuture<List<T>> sendMessageToShardManagers(final Object message) {
Timeout timeout = SHARD_MGR_TIMEOUT;
- ListenableFuture<T> configFuture = ask(configDataStore.getActorContext().getShardManager(), message, timeout);
- ListenableFuture<T> operFuture = ask(operDataStore.getActorContext().getShardManager(), message, timeout);
+ ListenableFuture<T> configFuture = ask(configDataStore.getActorUtils().getShardManager(), message, timeout);
+ ListenableFuture<T> operFuture = ask(operDataStore.getActorUtils().getShardManager(), message, timeout);
return Futures.allAsList(configFuture, operFuture);
}
- private <T> ListenableFuture<T> sendMessageToShardManager(DataStoreType dataStoreType, Object message) {
+ private <T> ListenableFuture<T> sendMessageToShardManager(final DataStoreType dataStoreType, final Object message) {
ActorRef shardManager = dataStoreType == DataStoreType.Config
- ? configDataStore.getActorContext().getShardManager()
- : operDataStore.getActorContext().getShardManager();
+ ? configDataStore.getActorUtils().getShardManager()
+ : operDataStore.getActorUtils().getShardManager();
return ask(shardManager, message, SHARD_MGR_TIMEOUT);
}
+ @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+ justification = "https://github.com/spotbugs/spotbugs/issues/811")
@SuppressWarnings("checkstyle:IllegalCatch")
- private static void saveSnapshotsToFile(DatastoreSnapshotList snapshots, String fileName,
- SettableFuture<RpcResult<Void>> returnFuture) {
+ private static void saveSnapshotsToFile(final DatastoreSnapshotList snapshots, final String fileName,
+ final SettableFuture<RpcResult<BackupDatastoreOutput>> returnFuture) {
try (FileOutputStream fos = new FileOutputStream(fileName)) {
SerializationUtils.serialize(snapshots, fos);
- returnFuture.set(newSuccessfulResult());
+ returnFuture.set(newSuccessfulResult(new BackupDatastoreOutputBuilder().build()));
LOG.info("Successfully backed up datastore to file {}", fileName);
} catch (IOException | RuntimeException e) {
onDatastoreBackupFailure(fileName, returnFuture, e);
}
}
- private static void onDatastoreBackupFailure(String fileName, SettableFuture<RpcResult<Void>> returnFuture,
- Throwable failure) {
+ private static <T> void onDatastoreBackupFailure(final String fileName,
+ final SettableFuture<RpcResult<T>> returnFuture, final Throwable failure) {
onMessageFailure(String.format("Failed to back up datastore to file %s", fileName), returnFuture, failure);
}
- private static void onMessageFailure(String msg, final SettableFuture<RpcResult<Void>> returnFuture,
- Throwable failure) {
- LOG.error(msg, failure);
- returnFuture.set(ClusterAdminRpcService.<Void>newFailedRpcResultBuilder(String.format("%s: %s", msg,
+ @SuppressFBWarnings("SLF4J_SIGN_ONLY_FORMAT")
+ private static <T> void onMessageFailure(final String msg, final SettableFuture<RpcResult<T>> returnFuture,
+ final Throwable failure) {
+ LOG.error("{}", msg, failure);
+ returnFuture.set(ClusterAdminRpcService.<T>newFailedRpcResultBuilder(String.format("%s: %s", msg,
failure.getMessage())).build());
}
- private <T> ListenableFuture<T> ask(ActorRef actor, Object message, Timeout timeout) {
+ private <T> ListenableFuture<T> ask(final ActorRef actor, final Object message, final Timeout timeout) {
final SettableFuture<T> returnFuture = SettableFuture.create();
@SuppressWarnings("unchecked")
scala.concurrent.Future<T> askFuture = (scala.concurrent.Future<T>) Patterns.ask(actor, message, timeout);
askFuture.onComplete(new OnComplete<T>() {
@Override
- public void onComplete(Throwable failure, T resp) {
+ public void onComplete(final Throwable failure, final T resp) {
if (failure != null) {
returnFuture.setException(failure);
} else {
returnFuture.set(resp);
}
}
- }, configDataStore.getActorContext().getClientDispatcher());
+ }, configDataStore.getActorUtils().getClientDispatcher());
return returnFuture;
}
- private static <T> ListenableFuture<RpcResult<T>> newFailedRpcResultFuture(String message) {
+ private static <T> ListenableFuture<RpcResult<T>> newFailedRpcResultFuture(final String message) {
return ClusterAdminRpcService.<T>newFailedRpcResultBuilder(message).buildFuture();
}
- private static <T> RpcResultBuilder<T> newFailedRpcResultBuilder(String message) {
+ private static <T> RpcResultBuilder<T> newFailedRpcResultBuilder(final String message) {
return newFailedRpcResultBuilder(message, null);
}
- private static <T> RpcResultBuilder<T> newFailedRpcResultBuilder(String message, Throwable cause) {
+ private static <T> RpcResultBuilder<T> newFailedRpcResultBuilder(final String message, final Throwable cause) {
return RpcResultBuilder.<T>failed().withError(ErrorType.RPC, message, cause);
}
- private static RpcResult<Void> newSuccessfulResult() {
- return newSuccessfulResult((Void)null);
- }
-
- private static <T> RpcResult<T> newSuccessfulResult(T data) {
- return RpcResultBuilder.<T>success(data).build();
+ private static <T> RpcResult<T> newSuccessfulResult(final T data) {
+ return RpcResultBuilder.success(data).build();
}
}