- LOG.info("In unsubscribeDdtl");
-
- if (idIntsDdtl == null || ddtlReg == null) {
- return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(
- ErrorType.RPC, "data-missing", "No prior listener was registered").buildFuture();
- }
-
- long timeout = 120L;
- try {
- idIntsDdtl.tryFinishProcessing().get(timeout, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- LOG.error("Unable to finish notification processing", e);
- return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
- "Unable to finish notification processing in " + timeout + " seconds", e).buildFuture();
- }
-
- ddtlReg.close();
- ddtlReg = null;
-
- if (!idIntsDdtl.hasTriggered()) {
- return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
- "No notification received.", "id-ints listener has not received any notifications").buildFuture();
- }
-
- final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
- LOG.debug("Creating distributed datastore client for shard {}", shardName);
-
- final ActorUtils actorUtils = configDataStore.getActorUtils();
- final Props distributedDataStoreClientProps =
- SimpleDataStoreClientActor.props(actorUtils.getCurrentMemberName(),
- "Shard-" + shardName, actorUtils, shardName);
-
- final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
- final DataStoreClient distributedDataStoreClient;
- try {
- distributedDataStoreClient = SimpleDataStoreClientActor
- .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
- } catch (RuntimeException e) {
- LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
- clientActor.tell(PoisonPill.getInstance(), noSender());
- return RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
- .withError(ErrorType.APPLICATION, "Unable to create DataStoreClient for read", e).buildFuture();
- }
-
- final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
- final ClientTransaction tx = localHistory.createTransaction();
- final ListenableFuture<java.util.Optional<NormalizedNode<?, ?>>> read =
- tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
-
- tx.abort();
- localHistory.close();
- try {
- final java.util.Optional<NormalizedNode<?, ?>> optional = read.get();
- if (!optional.isPresent()) {
- return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
- "data-missing", "Final read from id-ints is empty").buildFuture();
- }
-
- return RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder().setCopyMatches(
- idIntsDdtl.checkEqual(optional.get()))).buildFuture();
-
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Unable to read data to verify ddtl data", e);
- return RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
- .withError(ErrorType.APPLICATION, "Final read from id-ints failed", e).buildFuture();
- } finally {
- distributedDataStoreClient.close();
- clientActor.tell(PoisonPill.getInstance(), noSender());
- }