+ @Override
+ public Future<RpcResult<Void>> shutdownPrefixShardReplica(final ShutdownPrefixShardReplicaInput input) {
+ LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
+
+ final InstanceIdentifier<?> shardPrefix = input.getPrefix();
+
+ if (shardPrefix == null) {
+ final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
+ "A valid shard prefix must be specified");
+ return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
+ }
+
+ final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
+ final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
+
+ return shutdownShardGracefully(cleanPrefixShardName);
+ }
+
+ private SettableFuture<RpcResult<Void>> shutdownShardGracefully(final String shardName) {
+ final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
+ final ActorContext context = configDataStore.getActorContext();
+
+ long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
+ .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
+ final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
+ final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
+
+ context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
+ @Override
+ public void onComplete(final Throwable throwable, final ActorRef actorRef) throws Throwable {
+ if (throwable != null) {
+ shutdownShardAsk.failure(throwable);
+ } else {
+ shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
+ }
+ }
+ }, context.getClientDispatcher());
+
+ shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
+ @Override
+ public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) throws Throwable {
+ if (throwable != null) {
+ final RpcResult<Void> failedResult = RpcResultBuilder.<Void>failed()
+ .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
+ rpcResult.set(failedResult);
+ } else {
+ // according to Patterns.gracefulStop API, we don't have to
+ // check value of gracefulStopResult
+ rpcResult.set(RpcResultBuilder.<Void>success().build());
+ }
+ }
+ }, context.getClientDispatcher());
+ return rpcResult;