- LOG.debug("Received unsubscribe-ddtl.");
-
- if (idIntsDdtl == null || ddtlReg == null) {
- final RpcError error = RpcResultBuilder.newError(
- ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
- return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
- .withRpcError(error).build());
- }
-
- try {
- idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
- "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
- return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
- .withRpcError(error).build());
- }
-
- ddtlReg.close();
- ddtlReg = null;
-
- if (!idIntsDdtl.hasTriggered()) {
- final RpcError error = RpcResultBuilder.newError(
- ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
- + "any notifications.");
- return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
- .withRpcError(error).build());
- }
-
- final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
- LOG.debug("Creating distributed datastore client for shard {}", shardName);
-
- final ActorContext actorContext = configDataStore.getActorContext();
- final Props distributedDataStoreClientProps =
- SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
- "Shard-" + shardName, actorContext, shardName);
-
- final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
- final DataStoreClient distributedDataStoreClient;
- try {
- distributedDataStoreClient = SimpleDataStoreClientActor
- .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
- } catch (RuntimeException e) {
- LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
- clientActor.tell(PoisonPill.getInstance(), noSender());
- final RpcError error = RpcResultBuilder.newError(
- ErrorType.APPLICATION, "Unable to create ds client for read.",
- "Unable to create ds client for read.");
- return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
- .withRpcError(error).build());
- }
-
- final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
- final ClientTransaction tx = localHistory.createTransaction();
- final ListenableFuture<java.util.Optional<NormalizedNode<?, ?>>> read =
- tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
-
- tx.abort();
- localHistory.close();
- try {
- final java.util.Optional<NormalizedNode<?, ?>> optional = read.get();
- if (!optional.isPresent()) {
- LOG.warn("Final read from client is empty.");
- final RpcError error = RpcResultBuilder.newError(
- ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
- return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
- .withRpcError(error).build());
- }
-
- return Futures.immediateFuture(
- RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
- .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
-
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Unable to read data to verify ddtl data.", e);
- final RpcError error = RpcResultBuilder.newError(
- ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
- return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
- .withRpcError(error).build());
- } finally {
- distributedDataStoreClient.close();
- clientActor.tell(PoisonPill.getInstance(), noSender());
- }