/* * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.cluster.datastore.admin; import akka.actor.ActorRef; import akka.actor.Status.Success; import akka.dispatch.OnComplete; import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.base.Function; import com.google.common.base.Strings; import com.google.common.base.Throwables; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.io.FileOutputStream; import java.io.IOException; import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList; import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingState; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Implements the yang RPCs defined in the generated ClusterAdminService interface. * * @author Thomas Pantelis */ public class ClusterAdminRpcService implements ClusterAdminService { private static final Timeout SHARD_MGR_TIMEOUT = new Timeout(1, TimeUnit.MINUTES); private static final Logger LOG = LoggerFactory.getLogger(ClusterAdminRpcService.class); private final DistributedDataStoreInterface configDataStore; private final DistributedDataStoreInterface operDataStore; public ClusterAdminRpcService(DistributedDataStoreInterface configDataStore, DistributedDataStoreInterface operDataStore) { this.configDataStore = configDataStore; this.operDataStore = operDataStore; } @Override public Future> addShardReplica(final AddShardReplicaInput input) { final String shardName = input.getShardName(); if (Strings.isNullOrEmpty(shardName)) { return newFailedRpcResultFuture("A valid shard name must be specified"); } DataStoreType dataStoreType = input.getDataStoreType(); if (dataStoreType == null) { return newFailedRpcResultFuture("A valid DataStoreType must be specified"); } LOG.info("Adding replica for shard {}", shardName); final SettableFuture> returnFuture = SettableFuture.create(); ListenableFuture future = sendMessageToShardManager(dataStoreType, new AddShardReplica(shardName)); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Success success) { LOG.info("Successfully added replica for shard {}", shardName); returnFuture.set(newSuccessfulResult()); } @Override public void onFailure(Throwable failure) { onMessageFailure(String.format("Failed to add replica for shard %s", shardName), returnFuture, failure); } }); return returnFuture; } @Override public Future> removeShardReplica(RemoveShardReplicaInput input) { final String shardName = input.getShardName(); if (Strings.isNullOrEmpty(shardName)) { return newFailedRpcResultFuture("A valid shard name must be specified"); } DataStoreType dataStoreType = input.getDataStoreType(); if (dataStoreType == null) { return newFailedRpcResultFuture("A valid DataStoreType must be specified"); } final String memberName = input.getMemberName(); if (Strings.isNullOrEmpty(memberName)) { return newFailedRpcResultFuture("A valid member name must be specified"); } LOG.info("Removing replica for shard {} memberName {}, datastoreType {}", shardName, memberName, dataStoreType); final SettableFuture> returnFuture = SettableFuture.create(); ListenableFuture future = sendMessageToShardManager(dataStoreType, new RemoveShardReplica(shardName, MemberName.forName(memberName))); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Success success) { LOG.info("Successfully removed replica for shard {}", shardName); returnFuture.set(newSuccessfulResult()); } @Override public void onFailure(Throwable failure) { onMessageFailure(String.format("Failed to remove replica for shard %s", shardName), returnFuture, failure); } }); return returnFuture; } @Override public Future> addReplicasForAllShards() { LOG.info("Adding replicas for all shards"); final List, ShardResultBuilder>> shardResultData = new ArrayList<>(); Function messageSupplier = shardName -> new AddShardReplica(shardName); sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier); sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier); return waitForShardResults(shardResultData, shardResults -> new AddReplicasForAllShardsOutputBuilder().setShardResult(shardResults).build(), "Failed to add replica"); } @Override public Future> removeAllShardReplicas(RemoveAllShardReplicasInput input) { LOG.info("Removing replicas for all shards"); final String memberName = input.getMemberName(); if (Strings.isNullOrEmpty(memberName)) { return newFailedRpcResultFuture("A valid member name must be specified"); } final List, ShardResultBuilder>> shardResultData = new ArrayList<>(); Function messageSupplier = shardName -> new RemoveShardReplica(shardName, MemberName.forName(memberName)); sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier); sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier); return waitForShardResults(shardResultData, shardResults -> new RemoveAllShardReplicasOutputBuilder().setShardResult(shardResults).build(), " Failed to remove replica"); } @Override public Future> changeMemberVotingStatesForShard(ChangeMemberVotingStatesForShardInput input) { final String shardName = input.getShardName(); if (Strings.isNullOrEmpty(shardName)) { return newFailedRpcResultFuture("A valid shard name must be specified"); } DataStoreType dataStoreType = input.getDataStoreType(); if (dataStoreType == null) { return newFailedRpcResultFuture("A valid DataStoreType must be specified"); } List memberVotingStates = input.getMemberVotingState(); if (memberVotingStates == null || memberVotingStates.isEmpty()) { return newFailedRpcResultFuture("No member voting state input was specified"); } ChangeShardMembersVotingStatus changeVotingStatus = toChangeShardMembersVotingStatus(shardName, memberVotingStates); LOG.info("Change member voting states for shard {}: {}", shardName, changeVotingStatus.getMeberVotingStatusMap()); final SettableFuture> returnFuture = SettableFuture.create(); ListenableFuture future = sendMessageToShardManager(dataStoreType, changeVotingStatus); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Success success) { LOG.info("Successfully changed member voting states for shard {}", shardName); returnFuture.set(newSuccessfulResult()); } @Override public void onFailure(Throwable failure) { onMessageFailure(String.format("Failed to change member voting states for shard %s", shardName), returnFuture, failure); } }); return returnFuture; } @Override public Future> changeMemberVotingStatesForAllShards( final ChangeMemberVotingStatesForAllShardsInput input) { List memberVotingStates = input.getMemberVotingState(); if (memberVotingStates == null || memberVotingStates.isEmpty()) { return newFailedRpcResultFuture("No member voting state input was specified"); } final List, ShardResultBuilder>> shardResultData = new ArrayList<>(); Function messageSupplier = shardName -> toChangeShardMembersVotingStatus(shardName, memberVotingStates); LOG.info("Change member voting states for all shards"); sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier); sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier); return waitForShardResults(shardResultData, shardResults -> new ChangeMemberVotingStatesForAllShardsOutputBuilder().setShardResult(shardResults).build(), "Failed to change member voting states"); } @Override public Future> flipMemberVotingStatesForAllShards() { final List, ShardResultBuilder>> shardResultData = new ArrayList<>(); Function messageSupplier = shardName -> new FlipShardMembersVotingStatus(shardName); LOG.info("Flip member voting states for all shards"); sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier); sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier); return waitForShardResults(shardResultData, shardResults -> new FlipMemberVotingStatesForAllShardsOutputBuilder().setShardResult(shardResults).build(), "Failed to change member voting states"); } @Override public Future> backupDatastore(final BackupDatastoreInput input) { LOG.debug("backupDatastore: {}", input); if (Strings.isNullOrEmpty(input.getFilePath())) { return newFailedRpcResultFuture("A valid file path must be specified"); } final SettableFuture> returnFuture = SettableFuture.create(); ListenableFuture> future = sendMessageToShardManagers(GetSnapshot.INSTANCE); Futures.addCallback(future, new FutureCallback>() { @Override public void onSuccess(List snapshots) { saveSnapshotsToFile(new DatastoreSnapshotList(snapshots), input.getFilePath(), returnFuture); } @Override public void onFailure(Throwable failure) { onDatastoreBackupFailure(input.getFilePath(), returnFuture, failure); } }); return returnFuture; } private ChangeShardMembersVotingStatus toChangeShardMembersVotingStatus(final String shardName, List memberVotingStatus) { Map serverVotingStatusMap = new HashMap<>(); for (MemberVotingState memberStatus: memberVotingStatus) { serverVotingStatusMap.put(memberStatus.getMemberName(), memberStatus.isVoting()); } ChangeShardMembersVotingStatus changeVotingStatus = new ChangeShardMembersVotingStatus(shardName, serverVotingStatusMap); return changeVotingStatus; } private static SettableFuture> waitForShardResults( final List, ShardResultBuilder>> shardResultData, final Function, T> resultDataSupplier, final String failureLogMsgPrefix) { final SettableFuture> returnFuture = SettableFuture.create(); final List shardResults = new ArrayList<>(); for (final Entry, ShardResultBuilder> entry : shardResultData) { Futures.addCallback(entry.getKey(), new FutureCallback() { @Override public void onSuccess(Success result) { synchronized (shardResults) { ShardResultBuilder shardResult = entry.getValue(); LOG.debug("onSuccess for shard {}, type {}", shardResult.getShardName(), shardResult.getDataStoreType()); shardResults.add(shardResult.setSucceeded(true).build()); checkIfComplete(); } } @Override public void onFailure(Throwable failure) { synchronized (shardResults) { ShardResultBuilder shardResult = entry.getValue(); LOG.warn("{} for shard {}, type {}", failureLogMsgPrefix, shardResult.getShardName(), shardResult.getDataStoreType(), failure); shardResults.add(shardResult.setSucceeded(false).setErrorMessage( Throwables.getRootCause(failure).getMessage()).build()); checkIfComplete(); } } void checkIfComplete() { LOG.debug("checkIfComplete: expected {}, actual {}", shardResultData.size(), shardResults.size()); if (shardResults.size() == shardResultData.size()) { returnFuture.set(newSuccessfulResult(resultDataSupplier.apply(shardResults))); } } }); } return returnFuture; } private void sendMessageToManagerForConfiguredShards(DataStoreType dataStoreType, List, ShardResultBuilder>> shardResultData, Function messageSupplier) { ActorContext actorContext = dataStoreType == DataStoreType.Config ? configDataStore.getActorContext() : operDataStore.getActorContext(); Set allShardNames = actorContext.getConfiguration().getAllShardNames(); LOG.debug("Sending message to all shards {} for data store {}", allShardNames, actorContext.getDataStoreName()); for (String shardName: allShardNames) { ListenableFuture future = this.ask(actorContext.getShardManager(), messageSupplier.apply(shardName), SHARD_MGR_TIMEOUT); shardResultData.add(new SimpleEntry<>(future, new ShardResultBuilder().setShardName(shardName).setDataStoreType(dataStoreType))); } } @SuppressWarnings("unchecked") private ListenableFuture> sendMessageToShardManagers(Object message) { Timeout timeout = SHARD_MGR_TIMEOUT; ListenableFuture configFuture = ask(configDataStore.getActorContext().getShardManager(), message, timeout); ListenableFuture operFuture = ask(operDataStore.getActorContext().getShardManager(), message, timeout); return Futures.allAsList(configFuture, operFuture); } private ListenableFuture sendMessageToShardManager(DataStoreType dataStoreType, Object message) { ActorRef shardManager = dataStoreType == DataStoreType.Config ? configDataStore.getActorContext().getShardManager() : operDataStore.getActorContext().getShardManager(); return ask(shardManager, message, SHARD_MGR_TIMEOUT); } @SuppressWarnings("checkstyle:IllegalCatch") private static void saveSnapshotsToFile(DatastoreSnapshotList snapshots, String fileName, SettableFuture> returnFuture) { try (FileOutputStream fos = new FileOutputStream(fileName)) { SerializationUtils.serialize(snapshots, fos); returnFuture.set(newSuccessfulResult()); LOG.info("Successfully backed up datastore to file {}", fileName); } catch (IOException | RuntimeException e) { onDatastoreBackupFailure(fileName, returnFuture, e); } } private static void onDatastoreBackupFailure(String fileName, SettableFuture> returnFuture, Throwable failure) { onMessageFailure(String.format("Failed to back up datastore to file %s", fileName), returnFuture, failure); } private static void onMessageFailure(String msg, final SettableFuture> returnFuture, Throwable failure) { LOG.error(msg, failure); returnFuture.set(ClusterAdminRpcService.newFailedRpcResultBuilder(String.format("%s: %s", msg, failure.getMessage())).build()); } private ListenableFuture ask(ActorRef actor, Object message, Timeout timeout) { final SettableFuture returnFuture = SettableFuture.create(); @SuppressWarnings("unchecked") scala.concurrent.Future askFuture = (scala.concurrent.Future) Patterns.ask(actor, message, timeout); askFuture.onComplete(new OnComplete() { @Override public void onComplete(Throwable failure, T resp) { if (failure != null) { returnFuture.setException(failure); } else { returnFuture.set(resp); } } }, configDataStore.getActorContext().getClientDispatcher()); return returnFuture; } private static ListenableFuture> newFailedRpcResultFuture(String message) { return ClusterAdminRpcService.newFailedRpcResultBuilder(message).buildFuture(); } private static RpcResultBuilder newFailedRpcResultBuilder(String message) { return newFailedRpcResultBuilder(message, null); } private static RpcResultBuilder newFailedRpcResultBuilder(String message, Throwable cause) { return RpcResultBuilder.failed().withError(ErrorType.RPC, message, cause); } private static RpcResult newSuccessfulResult() { return newSuccessfulResult((Void)null); } private static RpcResult newSuccessfulResult(T data) { return RpcResultBuilder.success(data).build(); } }