+ LOG.debug("Received unsubscribe-ddtl.");
+
+ if (idIntsDdtl == null || ddtlReg == null) {
+ final RpcError error = RpcResultBuilder.newError(
+ ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
+ return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
+ .withRpcError(error).build());
+ }
+
+ try {
+ idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
+ "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
+ return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
+ .withRpcError(error).build());
+ }
+
+ ddtlReg.close();
+ ddtlReg = null;
+
+ if (!idIntsDdtl.hasTriggered()) {
+ final RpcError error = RpcResultBuilder.newError(
+ ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
+ + "any notifications.");
+ return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
+ .withRpcError(error).build());
+ }
+
+ final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
+ LOG.debug("Creating distributed datastore client for shard {}", shardName);
+
+ final ActorContext actorContext = configDataStore.getActorContext();
+ final Props distributedDataStoreClientProps =
+ SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
+ "Shard-" + shardName, actorContext, shardName);
+
+ final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
+ final DataStoreClient distributedDataStoreClient;
+ try {
+ distributedDataStoreClient = SimpleDataStoreClientActor
+ .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
+ } catch (RuntimeException e) {
+ LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
+ clientActor.tell(PoisonPill.getInstance(), noSender());
+ final RpcError error = RpcResultBuilder.newError(
+ ErrorType.APPLICATION, "Unable to create ds client for read.",
+ "Unable to create ds client for read.");
+ return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
+ .withRpcError(error).build());
+ }
+
+ final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
+ final ClientTransaction tx = localHistory.createTransaction();
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>,
+ org.opendaylight.mdsal.common.api.ReadFailedException> read =
+ tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
+
+ tx.abort();
+ localHistory.close();
+ try {
+ final Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
+ if (!optional.isPresent()) {
+ LOG.warn("Final read from client is empty.");
+ final RpcError error = RpcResultBuilder.newError(
+ ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
+ return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
+ .withRpcError(error).build());
+ }
+
+ return Futures.immediateFuture(
+ RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
+ .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
+
+ } catch (org.opendaylight.mdsal.common.api.ReadFailedException e) {
+ LOG.error("Unable to read data to verify ddtl data.", e);
+ final RpcError error = RpcResultBuilder.newError(
+ ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
+ return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
+ .withRpcError(error).build());
+ } finally {
+ distributedDataStoreClient.close();
+ clientActor.tell(PoisonPill.getInstance(), noSender());
+ }