+ LOG.debug("Received unsubscribe-ddtl.");
+
+ if (idIntsDdtl == null || ddtlReg == null) {
+ return failure(newError(ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered."));
+ }
+
+ final ListenableFuture<DataListenerState> future = idIntsDdtl.tryFinishProcessing(ddtlReg);
+ ddtlReg = null;
+
+ return Futures.withFallback(Futures.transform(future, this::ddtlOutput),
+ t -> failure(newError(ErrorType.RPC, "resource-denied-transport", "Failed to finish processing",
+ "clustering-it", "clustering-it", t)));
+ }
+
+ private RpcResult<UnsubscribeDdtlOutput> ddtlOutput(final DataListenerState state) {
+ if (state.changeCount() == 0) {
+ return failed(newError(ErrorType.APPLICATION, "No notification received.",
+ "id-ints listener has not received any notifications."));
+ }
+
+ final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
+ LOG.debug("Creating distributed datastore client for shard {}", shardName);
+
+ final ActorContext actorContext = configDataStore.getActorContext();
+ final Props distributedDataStoreClientProps = SimpleDataStoreClientActor.props(
+ actorContext.getCurrentMemberName(), "Shard-" + shardName, actorContext, shardName);
+
+ final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
+ final DataStoreClient distributedDataStoreClient;
+ try {
+ distributedDataStoreClient = SimpleDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30,
+ TimeUnit.SECONDS);
+ } catch (final Exception e) {
+ LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
+ clientActor.tell(PoisonPill.getInstance(), noSender());
+ return failed(newError(ErrorType.APPLICATION, "Unable to create ds client for read.",
+ "Unable to create ds client for read."));
+ }
+
+ final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
+ final ClientTransaction tx = localHistory.createTransaction();
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>,
+ org.opendaylight.mdsal.common.api.ReadFailedException> read =
+ tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
+
+ tx.abort();
+ localHistory.close();
+
+ final Optional<NormalizedNode<?, ?>> readResult;
+ try {
+ readResult = read.checkedGet();
+ } catch (org.opendaylight.mdsal.common.api.ReadFailedException e) {
+ LOG.error("Unable to read data to verify ddtl data.", e);
+ return failed(newError( ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed."));
+ } finally {
+ distributedDataStoreClient.close();
+ clientActor.tell(PoisonPill.getInstance(), noSender());
+ }
+
+ // FIXME run a diff
+ final NormalizedNode<?, ?> expected = state.lastData().orNull();
+ final NormalizedNode<?, ?> actual = readResult.orNull();
+ final boolean equal = Objects.equals(expected, actual);
+ if (!equal) {
+ LOG.debug("Expected result {} read resulted in {}", expected, actual);
+ }
+ final RpcResultBuilder<UnsubscribeDdtlOutput> b = RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
+ .setCopyMatches(equal).build());
+
+// for (DataListenerViolation violation : state.violations()) {
+// final Optional<NormalizedNodeDiff> diff = violation.toDiff();
+// if (diff.isPresent()) {
+// b.withWarning(ErrorType.APPLICATION, "Sequence mismatch", diff.get().toString());
+// }
+// }
+
+ return b.build();
+ }
+
+ private static <T> RpcResult<T> failed(final RpcError error) {
+ return RpcResultBuilder.<T>failed().withRpcError(error).build();
+ }
+
+ private static <T> ListenableFuture<RpcResult<T>> failure(final RpcError error) {
+ return Futures.immediateFuture(failed(error));
+ }
+
+ private static <T> ListenableFuture<RpcResult<T>> success(final T result) {
+ return Futures.immediateFuture(RpcResultBuilder.success(result).build());