import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
import com.google.common.base.Optional;
+import com.google.common.base.Strings;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
if (flappingSingletonService != null) {
final RpcError error = RpcResultBuilder.newError(
- ErrorType.RPC, "Registration present.", "flappin-singleton already registered");
+ ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
}
return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed().withRpcError(error).build());
}
- final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
- try {
- if (dtclReg != null) {
- dtclReg.close();
- dtclReg = null;
- }
-
+ dtclReg.close();
+ dtclReg = null;
+ if (!idIntsListener.hasTriggered()) {
+ final RpcError error = RpcResultBuilder.newError(
+ ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
+ "any notifications.");
+ return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
+ .withRpcError(error).build());
+ }
+ final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
+ try {
final Optional<NormalizedNode<?, ?>> readResult =
rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
return settableFuture;
}
+ @Override
+ public Future<RpcResult<Void>> shutdownShardReplica(final ShutdownShardReplicaInput input) {
+ LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
+
+ final String shardName = input.getShardName();
+ if (Strings.isNullOrEmpty(shardName)) {
+ final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
+ "A valid shard name must be specified");
+ return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
+ }
+
+ final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
+ final ActorContext context = configDataStore.getActorContext();
+
+ long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
+ .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
+ final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
+ final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
+
+ context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
+ @Override
+ public void onComplete(final Throwable throwable, final ActorRef actorRef) throws Throwable {
+ if (throwable != null) {
+ shutdownShardAsk.failure(throwable);
+ } else {
+ shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
+ }
+ }
+ }, context.getClientDispatcher());
+
+ shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
+ @Override
+ public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) throws Throwable {
+ if (throwable != null) {
+ final RpcResult<Void> failedResult = RpcResultBuilder.<Void>failed()
+ .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
+ rpcResult.set(failedResult);
+ } else {
+ // according to Patterns.gracefulStop API, we don't have to
+ // check value of gracefulStopResult
+ rpcResult.set(RpcResultBuilder.<Void>success().build());
+ }
+ }
+ }, context.getClientDispatcher());
+
+ return rpcResult;
+ }
+
@Override
public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
ddtlReg.close();
ddtlReg = null;
+ if (!idIntsDdtl.hasTriggered()) {
+ final RpcError error = RpcResultBuilder.newError(
+ ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
+ "any notifications.");
+ return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
+ .withRpcError(error).build());
+ }
+
final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
LOG.debug("Creating distributed datastore client for shard {}", shardName);