Add info logging and improve error reporting in clustering-it
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
index 8f67f136edb13f557f75f173c06ea299559c287f..bae07376cfa824422570a179c9c070f0b348ea56 100644 (file)
@@ -5,23 +5,36 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.clustering.it.provider;
 
 package org.opendaylight.controller.clustering.it.provider;
 
+import static akka.actor.ActorRef.noSender;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
 import com.google.common.base.Optional;
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import javax.annotation.Nonnull;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.opendaylight.controller.cluster.ActorSystemProvider;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
@@ -37,7 +50,6 @@ import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactions
 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
@@ -46,39 +58,84 @@ import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistrati
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
-import org.opendaylight.controller.sal.core.api.model.SchemaService;
 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
+import org.opendaylight.mdsal.dom.api.DOMSchemaService;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutputBuilder;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutputBuilder;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutputBuilder;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutputBuilder;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutputBuilder;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutputBuilder;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutputBuilder;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutputBuilder;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutputBuilder;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutputBuilder;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutputBuilder;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutputBuilder;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutputBuilder;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
@@ -87,14 +144,14 @@ import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.l
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
 
 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
 
 
 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
 
@@ -105,29 +162,31 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     private final RpcProviderRegistry rpcRegistry;
     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
     private final DistributedShardFactory distributedShardFactory;
     private final RpcProviderRegistry rpcRegistry;
     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
     private final DistributedShardFactory distributedShardFactory;
+    private final DistributedDataStoreInterface configDataStore;
     private final DOMDataTreeService domDataTreeService;
     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
     private final DOMDataBroker domDataBroker;
     private final NotificationPublishService notificationPublishService;
     private final NotificationService notificationService;
     private final DOMDataTreeService domDataTreeService;
     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
     private final DOMDataBroker domDataBroker;
     private final NotificationPublishService notificationPublishService;
     private final NotificationService notificationService;
-    private final SchemaService schemaService;
+    private final DOMSchemaService schemaService;
     private final ClusterSingletonServiceProvider singletonService;
     private final DOMRpcProviderService domRpcService;
     private final PrefixLeaderHandler prefixLeaderHandler;
     private final PrefixShardHandler prefixShardHandler;
     private final DOMDataTreeChangeService domDataTreeChangeService;
     private final ClusterSingletonServiceProvider singletonService;
     private final DOMRpcProviderService domRpcService;
     private final PrefixLeaderHandler prefixLeaderHandler;
     private final PrefixShardHandler prefixShardHandler;
     private final DOMDataTreeChangeService domDataTreeChangeService;
+    private final ActorSystem actorSystem;
 
 
-    private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
-            new HashMap<>();
+    private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>>
+            routedRegistrations = new HashMap<>();
 
 
-    private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
+    private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
 
     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
     private FlappingSingletonService flappingSingletonService;
     private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
     private IdIntsListener idIntsListener;
 
     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
     private FlappingSingletonService flappingSingletonService;
     private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
     private IdIntsListener idIntsListener;
-    private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
+    private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
     private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
     private IdIntsDOMDataTreeLIstener idIntsDdtl;
 
     private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
     private IdIntsDOMDataTreeLIstener idIntsDdtl;
 
@@ -136,13 +195,15 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
                                      final DOMRpcProviderService domRpcService,
                                      final ClusterSingletonServiceProvider singletonService,
     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
                                      final DOMRpcProviderService domRpcService,
                                      final ClusterSingletonServiceProvider singletonService,
-                                     final SchemaService schemaService,
+                                     final DOMSchemaService schemaService,
                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
                                      final NotificationPublishService notificationPublishService,
                                      final NotificationService notificationService,
                                      final DOMDataBroker domDataBroker,
                                      final DOMDataTreeService domDataTreeService,
                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
                                      final NotificationPublishService notificationPublishService,
                                      final NotificationService notificationService,
                                      final DOMDataBroker domDataBroker,
                                      final DOMDataTreeService domDataTreeService,
-                                     final DistributedShardFactory distributedShardFactory) {
+                                     final DistributedShardFactory distributedShardFactory,
+                                     final DistributedDataStoreInterface configDataStore,
+                                     final ActorSystemProvider actorSystemProvider) {
         this.rpcRegistry = rpcRegistry;
         this.domRpcService = domRpcService;
         this.singletonService = singletonService;
         this.rpcRegistry = rpcRegistry;
         this.domRpcService = domRpcService;
         this.singletonService = singletonService;
@@ -153,6 +214,9 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
         this.domDataBroker = domDataBroker;
         this.domDataTreeService = domDataTreeService;
         this.distributedShardFactory = distributedShardFactory;
         this.domDataBroker = domDataBroker;
         this.domDataTreeService = domDataTreeService;
         this.distributedShardFactory = distributedShardFactory;
+        this.configDataStore = configDataStore;
+        this.actorSystem = actorSystemProvider.getActorSystem();
+
         this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
 
         domDataTreeChangeService =
         this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
 
         domDataTreeChangeService =
@@ -165,34 +229,33 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<Void>> unregisterSingletonConstant() {
-        LOG.debug("unregister-singleton-constant");
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    public ListenableFuture<RpcResult<UnregisterSingletonConstantOutput>> unregisterSingletonConstant(
+            final UnregisterSingletonConstantInput input) {
+        LOG.info("In unregisterSingletonConstant");
 
         if (getSingletonConstantRegistration == null) {
 
         if (getSingletonConstantRegistration == null) {
-            LOG.debug("No get-singleton-constant registration present.");
-            final RpcError rpcError = RpcResultBuilder
-                    .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present.");
-            final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
-            return Futures.immediateFuture(result);
+            return RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withError(ErrorType.RPC, "data-missing",
+                    "No prior RPC was registered").buildFuture();
         }
 
         try {
             getSingletonConstantRegistration.close();
             getSingletonConstantRegistration = null;
 
         }
 
         try {
             getSingletonConstantRegistration.close();
             getSingletonConstantRegistration = null;
 
-            return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
-        } catch (final Exception e) {
-            LOG.debug("There was a problem closing the singleton constant service", e);
-            final RpcError rpcError = RpcResultBuilder
-                    .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant");
-            final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
-            return Futures.immediateFuture(result);
+            return RpcResultBuilder.success(new UnregisterSingletonConstantOutputBuilder().build()).buildFuture();
+        } catch (Exception e) {
+            String msg = "Error closing the singleton constant service";
+            LOG.error(msg, e);
+            return RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withError(
+                    ErrorType.APPLICATION, msg, e).buildFuture();
         }
     }
 
     @Override
         }
     }
 
     @Override
-    public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
-        LOG.debug("publish-notifications, input: {}", input);
+    public ListenableFuture<RpcResult<StartPublishNotificationsOutput>> startPublishNotifications(
+            final StartPublishNotificationsInput input) {
+        LOG.info("In startPublishNotifications - input: {}", input);
 
         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
                 input.getSeconds(), input.getNotificationsPerSecond());
 
         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
                 input.getSeconds(), input.getNotificationsPerSecond());
@@ -201,16 +264,16 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
 
         task.start();
 
 
         task.start();
 
-        return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+        return RpcResultBuilder.success(new StartPublishNotificationsOutputBuilder().build()).buildFuture();
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<Void>> subscribeDtcl() {
+    public ListenableFuture<RpcResult<SubscribeDtclOutput>> subscribeDtcl(final SubscribeDtclInput input) {
+        LOG.info("In subscribeDtcl - input: {}", input);
 
         if (dtclReg != null) {
 
         if (dtclReg != null) {
-            final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
-                    "There is already dataTreeChangeListener registered on id-ints list.");
-            return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+            return RpcResultBuilder.<SubscribeDtclOutput>failed().withError(ErrorType.RPC,
+                "data-exists", "There is already a DataTreeChangeListener registered for id-ints").buildFuture();
         }
 
         idIntsListener = new IdIntsListener();
         }
 
         idIntsListener = new IdIntsListener();
@@ -221,290 +284,286 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
                                 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
                         idIntsListener);
 
                                 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
                         idIntsListener);
 
-        return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+        return RpcResultBuilder.success(new SubscribeDtclOutputBuilder().build()).buildFuture();
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
-        LOG.debug("write-transactions, input: {}", input);
-
-        final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
-
-        final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
-        writeTransactionsHandler.start(settableFuture);
-
-        return settableFuture;
+    public ListenableFuture<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
+        return WriteTransactionsHandler.start(domDataBroker, input);
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
+    public ListenableFuture<RpcResult<IsClientAbortedOutput>> isClientAborted(final IsClientAbortedInput input) {
         return null;
     }
 
     @Override
         return null;
     }
 
     @Override
-    public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
+    public ListenableFuture<RpcResult<RemoveShardReplicaOutput>> removeShardReplica(
+            final RemoveShardReplicaInput input) {
         return null;
     }
 
     @Override
         return null;
     }
 
     @Override
-    public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
-
-        LOG.debug("subscribe-ynl, input: {}", input);
+    public ListenableFuture<RpcResult<SubscribeYnlOutput>> subscribeYnl(final SubscribeYnlInput input) {
+        LOG.info("In subscribeYnl - input: {}", input);
 
         if (ynlRegistrations.containsKey(input.getId())) {
 
         if (ynlRegistrations.containsKey(input.getId())) {
-            final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
-                    "There is already ynl listener registered for this id: " + input.getId());
-            return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+            return RpcResultBuilder.<SubscribeYnlOutput>failed().withError(ErrorType.RPC,
+                "data-exists", "There is already a listener registered for id: " + input.getId()).buildFuture();
         }
 
         ynlRegistrations.put(input.getId(),
                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
 
         }
 
         ynlRegistrations.put(input.getId(),
                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
 
-        return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+        return RpcResultBuilder.success(new SubscribeYnlOutputBuilder().build()).buildFuture();
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
-        LOG.debug("remove-prefix-shard, input: {}", input);
+    public ListenableFuture<RpcResult<RemovePrefixShardOutput>> removePrefixShard(final RemovePrefixShardInput input) {
+        LOG.info("In removePrefixShard - input: {}", input);
 
         return prefixShardHandler.onRemovePrefixShard(input);
     }
 
     @Override
 
         return prefixShardHandler.onRemovePrefixShard(input);
     }
 
     @Override
-    public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
-        LOG.debug("become-prefix-leader, input: {}", input);
+    public ListenableFuture<RpcResult<BecomePrefixLeaderOutput>> becomePrefixLeader(
+            final BecomePrefixLeaderInput input) {
+        LOG.info("n becomePrefixLeader - input: {}", input);
 
         return prefixLeaderHandler.makeLeaderLocal(input);
     }
 
     @Override
 
         return prefixLeaderHandler.makeLeaderLocal(input);
     }
 
     @Override
-    public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
-        LOG.debug("unregister-bound-constant, {}", input);
+    public ListenableFuture<RpcResult<UnregisterBoundConstantOutput>> unregisterBoundConstant(
+            final UnregisterBoundConstantInput input) {
+        LOG.info("In unregisterBoundConstant - {}", input);
 
 
-        final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
+        final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
                 routedRegistrations.remove(input.getContext());
 
                 routedRegistrations.remove(input.getContext());
 
-        if (registration == null) {
-            LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
-            final RpcError rpcError = RpcResultBuilder
-                    .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
-            final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
-            return Futures.immediateFuture(result);
+        if (rpcRegistration == null) {
+            return RpcResultBuilder.<UnregisterBoundConstantOutput>failed().withError(
+                ErrorType.RPC, "data-missing", "No prior RPC was registered for " + input.getContext()).buildFuture();
         }
 
         }
 
-        registration.close();
-        return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+        rpcRegistration.close();
+        return RpcResultBuilder.success(new UnregisterBoundConstantOutputBuilder().build()).buildFuture();
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
-
-        LOG.debug("Received register-singleton-constant rpc, input: {}", input);
+    public ListenableFuture<RpcResult<RegisterSingletonConstantOutput>> registerSingletonConstant(
+            final RegisterSingletonConstantInput input) {
+        LOG.info("In registerSingletonConstant - input: {}", input);
 
         if (input.getConstant() == null) {
 
         if (input.getConstant() == null) {
-            final RpcError error = RpcResultBuilder.newError(
-                    ErrorType.RPC, "Invalid input.", "Constant value is null");
-            return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+            return RpcResultBuilder.<RegisterSingletonConstantOutput>failed().withError(
+                    ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
         }
 
         getSingletonConstantRegistration =
                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
 
         }
 
         getSingletonConstantRegistration =
                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
 
-        return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+        return RpcResultBuilder.success(new RegisterSingletonConstantOutputBuilder().build()).buildFuture();
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
+    public ListenableFuture<RpcResult<RegisterDefaultConstantOutput>> registerDefaultConstant(
+            final RegisterDefaultConstantInput input) {
         return null;
     }
 
     @Override
         return null;
     }
 
     @Override
-    public Future<RpcResult<Void>> unregisterConstant() {
+    public ListenableFuture<RpcResult<UnregisterConstantOutput>> unregisterConstant(
+            final UnregisterConstantInput input) {
+        LOG.info("In unregisterConstant");
 
         if (globalGetConstantRegistration == null) {
 
         if (globalGetConstantRegistration == null) {
-            final RpcError rpcError = RpcResultBuilder
-                    .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
-            final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
-            return Futures.immediateFuture(result);
+            return RpcResultBuilder.<UnregisterConstantOutput>failed().withError(
+                ErrorType.RPC, "data-missing", "No prior RPC was registered").buildFuture();
         }
 
         globalGetConstantRegistration.close();
         globalGetConstantRegistration = null;
 
         }
 
         globalGetConstantRegistration.close();
         globalGetConstantRegistration = null;
 
-        return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+        return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterConstantOutputBuilder().build()).build());
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
-        LOG.debug("unregister-flapping-singleton received.");
+    public ListenableFuture<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton(
+            final UnregisterFlappingSingletonInput input) {
+        LOG.info("In unregisterFlappingSingleton");
 
         if (flappingSingletonService == null) {
 
         if (flappingSingletonService == null) {
-            final RpcError rpcError = RpcResultBuilder
-                    .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present.");
-            final RpcResult<UnregisterFlappingSingletonOutput> result =
-                    RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
-            return Futures.immediateFuture(result);
+            return RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withError(
+                ErrorType.RPC, "data-missing", "No prior RPC was registered").buildFuture();
         }
 
         final long flapCount = flappingSingletonService.setInactive();
         flappingSingletonService = null;
 
         }
 
         final long flapCount = flappingSingletonService.setInactive();
         flappingSingletonService = null;
 
-        final UnregisterFlappingSingletonOutput output =
-                new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
-
-        return Futures.immediateFuture(RpcResultBuilder.success(output).build());
+        return RpcResultBuilder.success(new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build())
+                .buildFuture();
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
+    public ListenableFuture<RpcResult<AddShardReplicaOutput>> addShardReplica(final AddShardReplicaInput input) {
         return null;
     }
 
     @Override
         return null;
     }
 
     @Override
-    public Future<RpcResult<Void>> subscribeDdtl() {
+    public ListenableFuture<RpcResult<SubscribeDdtlOutput>> subscribeDdtl(final SubscribeDdtlInput input) {
+        LOG.info("In subscribeDdtl");
 
         if (ddtlReg != null) {
 
         if (ddtlReg != null) {
-            final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
-                    "There is already dataTreeChangeListener registered on id-ints list.");
-            return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+            return RpcResultBuilder.<SubscribeDdtlOutput>failed().withError(ErrorType.RPC,
+                "data-exists", "There is already a listener registered for id-ints").buildFuture();
         }
 
         idIntsDdtl = new IdIntsDOMDataTreeLIstener();
 
         try {
         }
 
         idIntsDdtl = new IdIntsDOMDataTreeLIstener();
 
         try {
-            ddtlReg =
-                    domDataTreeService.registerListener(idIntsDdtl,
-                            Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
-                                    ProduceTransactionsHandler.ID_INT_YID))
-                            , true, Collections.emptyList());
+            ddtlReg = domDataTreeService.registerListener(idIntsDdtl,
+                    Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
+                            ProduceTransactionsHandler.ID_INT_YID)),
+                    true, Collections.emptyList());
         } catch (DOMDataTreeLoopException e) {
         } catch (DOMDataTreeLoopException e) {
-            LOG.error("Failed to register DOMDataTreeListener.", e);
-
+            LOG.error("Failed to register DOMDataTreeListener", e);
+            return RpcResultBuilder.<SubscribeDdtlOutput>failed().withError(
+                ErrorType.APPLICATION, "Failed to register DOMDataTreeListener", e).buildFuture();
         }
 
         }
 
-        return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+        return RpcResultBuilder.success(new SubscribeDdtlOutputBuilder().build()).buildFuture();
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
-        LOG.debug("register-bound-constant: {}", input);
+    public ListenableFuture<RpcResult<RegisterBoundConstantOutput>> registerBoundConstant(
+            final RegisterBoundConstantInput input) {
+        LOG.info("In registerBoundConstant - input: {}", input);
 
         if (input.getContext() == null) {
 
         if (input.getContext() == null) {
-            final RpcError error = RpcResultBuilder.newError(
-                    ErrorType.RPC, "Invalid input.", "Context value is null");
-            return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+            return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(
+                    ErrorType.RPC, "invalid-value", "Context value is null").buildFuture();
         }
 
         if (input.getConstant() == null) {
         }
 
         if (input.getConstant() == null) {
-            final RpcError error = RpcResultBuilder.newError(
-                    ErrorType.RPC, "Invalid input.", "Constant value is null");
-            return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+            return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(
+                    ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
         }
 
         if (routedRegistrations.containsKey(input.getContext())) {
         }
 
         if (routedRegistrations.containsKey(input.getContext())) {
-            final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
-                    "There is already a rpc registered for context: " + input.getContext());
-            return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+            return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(ErrorType.RPC,
+                "data-exists", "There is already an rpc registered for context: " + input.getContext()).buildFuture();
         }
 
         }
 
-        final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
+        final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
                         input.getConstant(), input.getContext());
 
                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
                         input.getConstant(), input.getContext());
 
-        routedRegistrations.put(input.getContext(), registration);
-        return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+        routedRegistrations.put(input.getContext(), rpcRegistration);
+        return RpcResultBuilder.success(new RegisterBoundConstantOutputBuilder().build()).buildFuture();
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<Void>> registerFlappingSingleton() {
-        LOG.debug("Received register-flapping-singleton.");
+    public ListenableFuture<RpcResult<RegisterFlappingSingletonOutput>> registerFlappingSingleton(
+            final RegisterFlappingSingletonInput input) {
+        LOG.info("In registerFlappingSingleton");
 
         if (flappingSingletonService != null) {
 
         if (flappingSingletonService != null) {
-            final RpcError error = RpcResultBuilder.newError(
-                    ErrorType.RPC, "Registration present.", "flappin-singleton already registered");
-            return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+            return RpcResultBuilder.<RegisterFlappingSingletonOutput>failed().withError(ErrorType.RPC,
+                "data-exists", "There is already an rpc registered").buildFuture();
         }
 
         flappingSingletonService = new FlappingSingletonService(singletonService);
 
         }
 
         flappingSingletonService = new FlappingSingletonService(singletonService);
 
-        return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+        return RpcResultBuilder.success(new RegisterFlappingSingletonOutputBuilder().build()).buildFuture();
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
-        LOG.debug("Received unsubscribe-dtcl");
+    public ListenableFuture<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl(final UnsubscribeDtclInput input) {
+        LOG.info("In unsubscribeDtcl");
 
         if (idIntsListener == null || dtclReg == null) {
 
         if (idIntsListener == null || dtclReg == null) {
-            final RpcError error = RpcResultBuilder.newError(
-                    ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
-            return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed().withRpcError(error).build());
+            return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(
+                    ErrorType.RPC, "data-missing", "No prior listener was registered").buildFuture();
         }
 
         }
 
-        final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
+        long timeout = 120L;
         try {
         try {
-            if (dtclReg != null) {
-                dtclReg.close();
-                dtclReg = null;
-            }
+            idIntsListener.tryFinishProcessing().get(timeout, TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            LOG.error("Unable to finish notification processing", e);
+            return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION,
+                    "Unable to finish notification processing in " + timeout + " seconds", e).buildFuture();
+        }
 
 
+        dtclReg.close();
+        dtclReg = null;
+
+        if (!idIntsListener.hasTriggered()) {
+            return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION, "operation-failed",
+                    "id-ints listener has not received any notifications.").buildFuture();
+        }
+
+        final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
+        try {
             final Optional<NormalizedNode<?, ?>> readResult =
             final Optional<NormalizedNode<?, ?>> readResult =
-                    rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
+                    rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).get();
 
             if (!readResult.isPresent()) {
 
             if (!readResult.isPresent()) {
-                final RpcError error = RpcResultBuilder.newError(
-                        ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
-                return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
-                        .withRpcError(error).build());
+                return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION, "data-missing",
+                        "No data read from id-ints list").buildFuture();
             }
 
             }
 
-            return Futures.immediateFuture(
-                    RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
-                            .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
+            final boolean nodesEqual = idIntsListener.checkEqual(readResult.get());
+            if (!nodesEqual) {
+                LOG.error("Final read of id-int does not match IdIntsListener's copy. {}",
+                        idIntsListener.diffWithLocalCopy(readResult.get()));
+            }
 
 
-        } catch (final ReadFailedException e) {
-            final RpcError error = RpcResultBuilder.newError(
-                    ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
-            return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
-                    .withRpcError(error).build());
+            return RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder().setCopyMatches(nodesEqual))
+                    .buildFuture();
 
 
+        } catch (final InterruptedException | ExecutionException e) {
+            LOG.error("Final read of id-ints failed", e);
+            return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION,
+                    "Final read of id-ints failed", e).buildFuture();
         }
     }
 
     @Override
         }
     }
 
     @Override
-    public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
-        LOG.debug("create-prefix-shard, input: {}", input);
+    public ListenableFuture<RpcResult<CreatePrefixShardOutput>> createPrefixShard(final CreatePrefixShardInput input) {
+        LOG.info("In createPrefixShard - input: {}", input);
 
         return prefixShardHandler.onCreatePrefixShard(input);
     }
 
     @Override
 
         return prefixShardHandler.onCreatePrefixShard(input);
     }
 
     @Override
-    public Future<RpcResult<Void>> deconfigureIdIntsShard() {
+    public ListenableFuture<RpcResult<DeconfigureIdIntsShardOutput>> deconfigureIdIntsShard(
+            final DeconfigureIdIntsShardInput input) {
         return null;
     }
 
     @Override
         return null;
     }
 
     @Override
-    public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
-        LOG.debug("Received unsubscribe-ynl, input: {}", input);
+    public ListenableFuture<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
+        LOG.info("In unsubscribeYnl - input: {}", input);
 
         if (!ynlRegistrations.containsKey(input.getId())) {
 
         if (!ynlRegistrations.containsKey(input.getId())) {
-            final RpcError rpcError = RpcResultBuilder
-                    .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered.");
-            final RpcResult<UnsubscribeYnlOutput> result =
-                    RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
-            return Futures.immediateFuture(result);
+            return RpcResultBuilder.<UnsubscribeYnlOutput>failed().withError(
+                ErrorType.RPC, "data-missing", "No prior listener was registered for " + input.getId()).buildFuture();
         }
 
         }
 
-        final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
-        final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
+        final ListenerRegistration<YnlListener> reg = ynlRegistrations.remove(input.getId());
+        final UnsubscribeYnlOutput output = reg.getInstance().getOutput();
 
 
-        registration.close();
+        reg.close();
 
 
-        return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
+        return RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).buildFuture();
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
+    public ListenableFuture<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
             final CheckPublishNotificationsInput input) {
             final CheckPublishNotificationsInput input) {
+        LOG.info("In checkPublishNotifications - input: {}", input);
 
         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
 
 
         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
 
@@ -517,131 +576,187 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
 
         if (task.getLastError() != null) {
                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
 
         if (task.getLastError() != null) {
-            final StringWriter sw = new StringWriter();
-            final PrintWriter pw = new PrintWriter(sw);
-            task.getLastError().printStackTrace(pw);
-            checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
+            LOG.error("Last error for {}", task, task.getLastError());
+            checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString());
         }
 
         final CheckPublishNotificationsOutput output =
                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
 
         }
 
         final CheckPublishNotificationsOutput output =
                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
 
-        return Futures.immediateFuture(RpcResultBuilder.success(output).build());
+        return RpcResultBuilder.success(output).buildFuture();
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
-        LOG.debug("producer-transactions, input: {}", input);
-
-        final ProduceTransactionsHandler handler =
-                new ProduceTransactionsHandler(domDataTreeService, input);
+    public ListenableFuture<RpcResult<ProduceTransactionsOutput>> produceTransactions(
+            final ProduceTransactionsInput input) {
+        LOG.info("In produceTransactions - input: {}", input);
+        return ProduceTransactionsHandler.start(domDataTreeService, input);
+    }
 
 
-        final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
-        handler.start(settableFuture);
+    @Override
+    public ListenableFuture<RpcResult<ShutdownShardReplicaOutput>> shutdownShardReplica(
+            final ShutdownShardReplicaInput input) {
+        LOG.info("In shutdownShardReplica - input: {}", input);
+
+        final String shardName = input.getShardName();
+        if (Strings.isNullOrEmpty(shardName)) {
+            return RpcResultBuilder.<ShutdownShardReplicaOutput>failed().withError(ErrorType.RPC, "bad-element",
+                shardName + "is not a valid shard name").buildFuture();
+        }
 
 
-        return settableFuture;
+        return shutdownShardGracefully(shardName, new ShutdownShardReplicaOutputBuilder().build());
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
+    public ListenableFuture<RpcResult<ShutdownPrefixShardReplicaOutput>> shutdownPrefixShardReplica(
+            final ShutdownPrefixShardReplicaInput input) {
+        LOG.info("shutdownPrefixShardReplica - input: {}", input);
+
+        final InstanceIdentifier<?> shardPrefix = input.getPrefix();
+
+        if (shardPrefix == null) {
+            return RpcResultBuilder.<ShutdownPrefixShardReplicaOutput>failed().withError(ErrorType.RPC, "bad-element",
+                    "A valid shard prefix must be specified").buildFuture();
+        }
 
 
-        LOG.debug("Received register-constant rpc, input: {}", input);
+        final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
+        final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
+
+        return shutdownShardGracefully(cleanPrefixShardName, new ShutdownPrefixShardReplicaOutputBuilder().build());
+    }
+
+    private <T> SettableFuture<RpcResult<T>> shutdownShardGracefully(final String shardName, final T success) {
+        final SettableFuture<RpcResult<T>> rpcResult = SettableFuture.create();
+        final ActorContext context = configDataStore.getActorContext();
+
+        long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
+                .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
+        final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
+        final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
+
+        context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
+            @Override
+            public void onComplete(final Throwable throwable, final ActorRef actorRef) {
+                if (throwable != null) {
+                    shutdownShardAsk.failure(throwable);
+                } else {
+                    shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
+                }
+            }
+        }, context.getClientDispatcher());
+
+        shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
+            @Override
+            public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) {
+                if (throwable != null) {
+                    final RpcResult<T> failedResult = RpcResultBuilder.<T>failed()
+                            .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
+                    rpcResult.set(failedResult);
+                } else {
+                    // according to Patterns.gracefulStop API, we don't have to
+                    // check value of gracefulStopResult
+                    rpcResult.set(RpcResultBuilder.success(success).build());
+                }
+            }
+        }, context.getClientDispatcher());
+        return rpcResult;
+    }
+
+    @Override
+    public ListenableFuture<RpcResult<RegisterConstantOutput>> registerConstant(final RegisterConstantInput input) {
+        LOG.info("In registerConstant - input: {}", input);
 
         if (input.getConstant() == null) {
 
         if (input.getConstant() == null) {
-            final RpcError error = RpcResultBuilder.newError(
-                    ErrorType.RPC, "Invalid input.", "Constant value is null");
-            return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+            return RpcResultBuilder.<RegisterConstantOutput>failed().withError(
+                    ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
         }
 
         if (globalGetConstantRegistration != null) {
         }
 
         if (globalGetConstantRegistration != null) {
-            final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
-                    "There is already a get-constant rpc registered.");
-            return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+            return RpcResultBuilder.<RegisterConstantOutput>failed().withError(ErrorType.RPC,
+                    "data-exists", "There is already an rpc registered").buildFuture();
         }
 
         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
         }
 
         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
-        return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+        return RpcResultBuilder.success(new RegisterConstantOutputBuilder().build()).buildFuture();
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<Void>> unregisterDefaultConstant() {
+    public ListenableFuture<RpcResult<UnregisterDefaultConstantOutput>> unregisterDefaultConstant(
+            final UnregisterDefaultConstantInput input) {
         return null;
     }
 
     @Override
         return null;
     }
 
     @Override
-    public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
-        LOG.debug("Received unsubscribe-ddtl.");
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    public ListenableFuture<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl(final UnsubscribeDdtlInput input) {
+        LOG.info("In unsubscribeDdtl");
 
         if (idIntsDdtl == null || ddtlReg == null) {
 
         if (idIntsDdtl == null || ddtlReg == null) {
-            final RpcError error = RpcResultBuilder.newError(
-                    ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
-            return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withRpcError(error).build());
+            return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(
+                    ErrorType.RPC, "data-missing", "No prior listener was registered").buildFuture();
         }
 
         }
 
-        ddtlReg.close();
-        ddtlReg = null;
-
-        final ReadListener readListener = new ReadListener();
+        long timeout = 120L;
         try {
         try {
-            final ListenerRegistration<ReadListener> registration = domDataTreeService.registerListener(readListener,
-                    Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
-                            ProduceTransactionsHandler.ID_INT_YID))
-                    , true, Collections.emptyList());
-
-            final DataTreeCandidate dataTreeCandidate = readListener.getFirstNotif().get();
-            registration.close();
-
-            if (!dataTreeCandidate.getRootNode().getDataAfter().isPresent()) {
-                final RpcError error = RpcResultBuilder.newError(
-                        ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
-                return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
-                        .withRpcError(error).build());
-            }
-
-            final NormalizedNode<?, ?> lastRead = dataTreeCandidate.getRootNode().getDataAfter().get();
-
-            return Futures.immediateFuture(
-                    RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
-                            .setCopyMatches(idIntsDdtl.checkEqual(lastRead))).build());
-
-        } catch (final DOMDataTreeLoopException | InterruptedException | ExecutionException e) {
-            LOG.error("Unable to read data to verify ddtl data.", e);
-            final RpcError error = RpcResultBuilder.newError(
-                    ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
-            return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
-                    .withRpcError(error).build());
+            idIntsDdtl.tryFinishProcessing().get(timeout, TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            LOG.error("Unable to finish notification processing", e);
+            return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
+                    "Unable to finish notification processing in " + timeout + " seconds", e).buildFuture();
         }
         }
-    }
 
 
-    private static class ReadListener implements DOMDataTreeListener {
+        ddtlReg.close();
+        ddtlReg = null;
 
 
-        private Collection<DataTreeCandidate> changes = null;
-        private SettableFuture<DataTreeCandidate> readFuture;
+        if (!idIntsDdtl.hasTriggered()) {
+            return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
+                    "No notification received.", "id-ints listener has not received any notifications").buildFuture();
+        }
 
 
-        @Override
-        public synchronized void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes,
-                                      @Nonnull final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
-            Preconditions.checkArgument(changes.size() == 1);
+        final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
+        LOG.debug("Creating distributed datastore client for shard {}", shardName);
 
 
-            if (this.changes == null) {
-                this.changes = changes;
+        final ActorContext actorContext = configDataStore.getActorContext();
+        final Props distributedDataStoreClientProps =
+                SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
+                        "Shard-" + shardName, actorContext, shardName);
 
 
-                readFuture.set(changes.iterator().next());
-            }
+        final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
+        final DataStoreClient distributedDataStoreClient;
+        try {
+            distributedDataStoreClient = SimpleDataStoreClientActor
+                    .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
+        } catch (RuntimeException e) {
+            LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
+            clientActor.tell(PoisonPill.getInstance(), noSender());
+            return RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
+                    .withError(ErrorType.APPLICATION, "Unable to create DataStoreClient for read", e).buildFuture();
         }
 
         }
 
-        @Override
-        public void onDataTreeFailed(@Nonnull final Collection<DOMDataTreeListeningException> causes) {
-            LOG.error("Read Listener failed. {}", causes);
-        }
+        final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
+        final ClientTransaction tx = localHistory.createTransaction();
+        final ListenableFuture<java.util.Optional<NormalizedNode<?, ?>>> read =
+                tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
 
 
-        public synchronized ListenableFuture<DataTreeCandidate> getFirstNotif() {
-            if (changes != null) {
-                return Futures.immediateFuture(changes.iterator().next());
+        tx.abort();
+        localHistory.close();
+        try {
+            final java.util.Optional<NormalizedNode<?, ?>> optional = read.get();
+            if (!optional.isPresent()) {
+                return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
+                        "data-missing", "Final read from id-ints is empty").buildFuture();
             }
 
             }
 
-            readFuture = SettableFuture.create();
-            return readFuture;
+            return RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder().setCopyMatches(
+                    idIntsDdtl.checkEqual(optional.get()))).buildFuture();
+
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Unable to read data to verify ddtl data", e);
+            return RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
+                    .withError(ErrorType.APPLICATION, "Final read from id-ints failed", e).buildFuture();
+        } finally {
+            distributedDataStoreClient.close();
+            clientActor.tell(PoisonPill.getInstance(), noSender());
         }
     }
 }
         }
     }
 }