+ if (!idIntsDdtl.hasTriggered()) {
+ final RpcError error = RpcResultBuilder.newError(
+ ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
+ "any notifications.");
+ return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
+ .withRpcError(error).build());
+ }
+
+ final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
+ LOG.debug("Creating distributed datastore client for shard {}", shardName);
+
+ final ActorContext actorContext = configDataStore.getActorContext();
+ final Props distributedDataStoreClientProps =
+ SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
+ "Shard-" + shardName, actorContext, shardName);
+
+ final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
+ final DataStoreClient distributedDataStoreClient;