Bug 8621 - Add shutdown-shard-replica rpc to MdsalLowLevelTestProvider
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
index bea3bcd11f0c6f68c02c69cccc79c250bc2fc54c..f0569fa07dc4e8842d92504168b117832b99acec 100644 (file)
@@ -14,7 +14,10 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
 import com.google.common.base.Optional;
+import com.google.common.base.Strings;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
@@ -33,6 +36,7 @@ import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStore
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
@@ -81,6 +85,7 @@ import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.l
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
@@ -104,6 +109,7 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
 
 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
 
@@ -437,7 +443,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
 
         if (flappingSingletonService != null) {
             final RpcError error = RpcResultBuilder.newError(
-                    ErrorType.RPC, "Registration present.", "flappin-singleton already registered");
+                    ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
         }
 
@@ -456,15 +462,19 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed().withRpcError(error).build());
         }
 
-        final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
-        try {
-            if (dtclReg != null) {
-                dtclReg.close();
-                dtclReg = null;
-            }
-
+        dtclReg.close();
+        dtclReg = null;
 
+        if (!idIntsListener.hasTriggered()) {
+            final RpcError error = RpcResultBuilder.newError(
+                    ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
+                            "any notifications.");
+            return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
+                    .withRpcError(error).build());
+        }
 
+        final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
+        try {
             final Optional<NormalizedNode<?, ?>> readResult =
                     rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
 
@@ -560,6 +570,54 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
         return settableFuture;
     }
 
+    @Override
+    public Future<RpcResult<Void>> shutdownShardReplica(final ShutdownShardReplicaInput input) {
+        LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
+
+        final String shardName = input.getShardName();
+        if (Strings.isNullOrEmpty(shardName)) {
+            final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
+                    "A valid shard name must be specified");
+            return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
+        }
+
+        final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
+        final ActorContext context = configDataStore.getActorContext();
+
+        long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
+                .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
+        final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
+        final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
+
+        context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
+            @Override
+            public void onComplete(final Throwable throwable, final ActorRef actorRef) throws Throwable {
+                if (throwable != null) {
+                    shutdownShardAsk.failure(throwable);
+                } else {
+                    shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
+                }
+            }
+        }, context.getClientDispatcher());
+
+        shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
+            @Override
+            public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) throws Throwable {
+                if (throwable != null) {
+                    final RpcResult<Void> failedResult = RpcResultBuilder.<Void>failed()
+                            .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
+                    rpcResult.set(failedResult);
+                } else {
+                    // according to Patterns.gracefulStop API, we don't have to
+                    // check value of gracefulStopResult
+                    rpcResult.set(RpcResultBuilder.<Void>success().build());
+                }
+            }
+        }, context.getClientDispatcher());
+
+        return rpcResult;
+    }
+
     @Override
     public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
 
@@ -599,6 +657,14 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
         ddtlReg.close();
         ddtlReg = null;
 
+        if (!idIntsDdtl.hasTriggered()) {
+            final RpcError error = RpcResultBuilder.newError(
+                    ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
+                            "any notifications.");
+            return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
+                    .withRpcError(error).build());
+        }
+
         final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
         LOG.debug("Creating distributed datastore client for shard {}", shardName);