+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public ListenableFuture<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl(final UnsubscribeDdtlInput input) {
+ LOG.info("In unsubscribeDdtl");
+
+ if (idIntsDdtl == null || ddtlReg == null) {
+ return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(
+ ErrorType.RPC, "data-missing", "No prior listener was registered").buildFuture();
+ }
+
+ long timeout = 120L;
+ try {
+ idIntsDdtl.tryFinishProcessing().get(timeout, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ LOG.error("Unable to finish notification processing", e);
+ return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
+ "Unable to finish notification processing in " + timeout + " seconds", e).buildFuture();
+ }
+
+ ddtlReg.close();
+ ddtlReg = null;
+
+ if (!idIntsDdtl.hasTriggered()) {
+ return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
+ "No notification received.", "id-ints listener has not received any notifications").buildFuture();
+ }
+
+ final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
+ LOG.debug("Creating distributed datastore client for shard {}", shardName);
+
+ final ActorContext actorContext = configDataStore.getActorContext();
+ final Props distributedDataStoreClientProps =
+ SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
+ "Shard-" + shardName, actorContext, shardName);
+
+ final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
+ final DataStoreClient distributedDataStoreClient;
+ try {
+ distributedDataStoreClient = SimpleDataStoreClientActor
+ .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
+ } catch (RuntimeException e) {
+ LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
+ clientActor.tell(PoisonPill.getInstance(), noSender());
+ return RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
+ .withError(ErrorType.APPLICATION, "Unable to create DataStoreClient for read", e).buildFuture();
+ }
+
+ final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
+ final ClientTransaction tx = localHistory.createTransaction();
+ final ListenableFuture<java.util.Optional<NormalizedNode<?, ?>>> read =
+ tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
+
+ tx.abort();
+ localHistory.close();
+ try {
+ final java.util.Optional<NormalizedNode<?, ?>> optional = read.get();
+ if (!optional.isPresent()) {
+ return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
+ "data-missing", "Final read from id-ints is empty").buildFuture();
+ }
+
+ return RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder().setCopyMatches(
+ idIntsDdtl.checkEqual(optional.get()))).buildFuture();
+
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Unable to read data to verify ddtl data", e);
+ return RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
+ .withError(ErrorType.APPLICATION, "Final read from id-ints failed", e).buildFuture();
+ } finally {
+ distributedDataStoreClient.close();
+ clientActor.tell(PoisonPill.getInstance(), noSender());
+ }