+ final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
+
+ if (task == null) {
+ return Futures.immediateFuture(RpcResultBuilder.success(
+ new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
+ }
+
+ final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
+ new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
+
+ if (task.getLastError() != null) {
+ LOG.error("Last error for {}", task, task.getLastError());
+ checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString());
+ }
+
+ final CheckPublishNotificationsOutput output =
+ checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
+
+ return RpcResultBuilder.success(output).buildFuture();
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<ProduceTransactionsOutput>> produceTransactions(
+ final ProduceTransactionsInput input) {
+ LOG.info("In produceTransactions - input: {}", input);
+ return ProduceTransactionsHandler.start(domDataTreeService, input);
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<ShutdownShardReplicaOutput>> shutdownShardReplica(
+ final ShutdownShardReplicaInput input) {
+ LOG.info("In shutdownShardReplica - input: {}", input);
+
+ final String shardName = input.getShardName();
+ if (Strings.isNullOrEmpty(shardName)) {
+ return RpcResultBuilder.<ShutdownShardReplicaOutput>failed().withError(ErrorType.RPC, "bad-element",
+ shardName + "is not a valid shard name").buildFuture();
+ }
+
+ return shutdownShardGracefully(shardName, new ShutdownShardReplicaOutputBuilder().build());
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<ShutdownPrefixShardReplicaOutput>> shutdownPrefixShardReplica(
+ final ShutdownPrefixShardReplicaInput input) {
+ LOG.info("shutdownPrefixShardReplica - input: {}", input);
+
+ final InstanceIdentifier<?> shardPrefix = input.getPrefix();
+
+ if (shardPrefix == null) {
+ return RpcResultBuilder.<ShutdownPrefixShardReplicaOutput>failed().withError(ErrorType.RPC, "bad-element",
+ "A valid shard prefix must be specified").buildFuture();
+ }
+
+ final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
+ final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
+
+ return shutdownShardGracefully(cleanPrefixShardName, new ShutdownPrefixShardReplicaOutputBuilder().build());
+ }
+
+ private <T> SettableFuture<RpcResult<T>> shutdownShardGracefully(final String shardName, final T success) {
+ final SettableFuture<RpcResult<T>> rpcResult = SettableFuture.create();
+ final ActorUtils context = configDataStore.getActorUtils();
+
+ long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
+ .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
+ final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
+ final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
+
+ context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
+ @Override
+ public void onComplete(final Throwable throwable, final ActorRef actorRef) {
+ if (throwable != null) {
+ shutdownShardAsk.failure(throwable);
+ } else {
+ shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
+ }
+ }
+ }, context.getClientDispatcher());
+
+ shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
+ @Override
+ public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) {
+ if (throwable != null) {
+ final RpcResult<T> failedResult = RpcResultBuilder.<T>failed()
+ .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
+ rpcResult.set(failedResult);
+ } else {
+ // according to Patterns.gracefulStop API, we don't have to
+ // check value of gracefulStopResult
+ rpcResult.set(RpcResultBuilder.success(success).build());
+ }
+ }
+ }, context.getClientDispatcher());
+ return rpcResult;
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<RegisterConstantOutput>> registerConstant(final RegisterConstantInput input) {
+ LOG.info("In registerConstant - input: {}", input);