import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
import com.google.common.base.Optional;
+import com.google.common.base.Strings;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
return settableFuture;
}
+ @Override
+ public Future<RpcResult<Void>> shutdownShardReplica(final ShutdownShardReplicaInput input) {
+ LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
+
+ final String shardName = input.getShardName();
+ if (Strings.isNullOrEmpty(shardName)) {
+ final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
+ "A valid shard name must be specified");
+ return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
+ }
+
+ final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
+ final ActorContext context = configDataStore.getActorContext();
+
+ long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
+ .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
+ final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
+ final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
+
+ context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
+ @Override
+ public void onComplete(final Throwable throwable, final ActorRef actorRef) throws Throwable {
+ if (throwable != null) {
+ shutdownShardAsk.failure(throwable);
+ } else {
+ shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
+ }
+ }
+ }, context.getClientDispatcher());
+
+ shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
+ @Override
+ public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) throws Throwable {
+ if (throwable != null) {
+ final RpcResult<Void> failedResult = RpcResultBuilder.<Void>failed()
+ .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
+ rpcResult.set(failedResult);
+ } else {
+ // according to Patterns.gracefulStop API, we don't have to
+ // check value of gracefulStopResult
+ rpcResult.set(RpcResultBuilder.<Void>success().build());
+ }
+ }
+ }, context.getClientDispatcher());
+
+ return rpcResult;
+ }
+
@Override
public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {