Add info logging and improve error reporting in clustering-it
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
index f47a66ee15c8f15c40f585d992e2cd1e7df89295..bae07376cfa824422570a179c9c070f0b348ea56 100644 (file)
@@ -5,19 +5,43 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.clustering.it.provider;
 
 package org.opendaylight.controller.clustering.it.provider;
 
+import static akka.actor.ActorRef.noSender;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-import java.io.PrintWriter;
-import java.io.StringWriter;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.opendaylight.controller.cluster.ActorSystemProvider;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
+import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
+import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
+import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
+import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
@@ -27,86 +51,159 @@ import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
-import org.opendaylight.controller.sal.core.api.model.SchemaService;
 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
+import org.opendaylight.mdsal.dom.api.DOMSchemaService;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
-import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomeModuleLeaderInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutputBuilder;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutputBuilder;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutputBuilder;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutputBuilder;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutputBuilder;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutputBuilder;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutputBuilder;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutputBuilder;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutputBuilder;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutputBuilder;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutputBuilder;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutputBuilder;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutputBuilder;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
 
 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
 
     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
 
 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
 
     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
+    private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
+            org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
 
     private final RpcProviderRegistry rpcRegistry;
     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
     private final DistributedShardFactory distributedShardFactory;
 
     private final RpcProviderRegistry rpcRegistry;
     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
     private final DistributedShardFactory distributedShardFactory;
+    private final DistributedDataStoreInterface configDataStore;
     private final DOMDataTreeService domDataTreeService;
     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
     private final DOMDataBroker domDataBroker;
     private final NotificationPublishService notificationPublishService;
     private final NotificationService notificationService;
     private final DOMDataTreeService domDataTreeService;
     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
     private final DOMDataBroker domDataBroker;
     private final NotificationPublishService notificationPublishService;
     private final NotificationService notificationService;
-    private final SchemaService schemaService;
+    private final DOMSchemaService schemaService;
     private final ClusterSingletonServiceProvider singletonService;
     private final DOMRpcProviderService domRpcService;
     private final ClusterSingletonServiceProvider singletonService;
     private final DOMRpcProviderService domRpcService;
+    private final PrefixLeaderHandler prefixLeaderHandler;
+    private final PrefixShardHandler prefixShardHandler;
+    private final DOMDataTreeChangeService domDataTreeChangeService;
+    private final ActorSystem actorSystem;
 
 
-    private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
-            new HashMap<>();
+    private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>>
+            routedRegistrations = new HashMap<>();
 
 
-    private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
+    private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
 
     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
     private FlappingSingletonService flappingSingletonService;
 
     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
     private FlappingSingletonService flappingSingletonService;
-    private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
+    private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
+    private IdIntsListener idIntsListener;
+    private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
+    private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
+    private IdIntsDOMDataTreeLIstener idIntsDdtl;
+
+
 
     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
                                      final DOMRpcProviderService domRpcService,
                                      final ClusterSingletonServiceProvider singletonService,
 
     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
                                      final DOMRpcProviderService domRpcService,
                                      final ClusterSingletonServiceProvider singletonService,
-                                     final SchemaService schemaService,
+                                     final DOMSchemaService schemaService,
                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
                                      final NotificationPublishService notificationPublishService,
                                      final NotificationService notificationService,
                                      final DOMDataBroker domDataBroker,
                                      final DOMDataTreeService domDataTreeService,
                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
                                      final NotificationPublishService notificationPublishService,
                                      final NotificationService notificationService,
                                      final DOMDataBroker domDataBroker,
                                      final DOMDataTreeService domDataTreeService,
-                                     final DistributedShardFactory distributedShardFactory) {
+                                     final DistributedShardFactory distributedShardFactory,
+                                     final DistributedDataStoreInterface configDataStore,
+                                     final ActorSystemProvider actorSystemProvider) {
         this.rpcRegistry = rpcRegistry;
         this.domRpcService = domRpcService;
         this.singletonService = singletonService;
         this.rpcRegistry = rpcRegistry;
         this.domRpcService = domRpcService;
         this.singletonService = singletonService;
@@ -117,39 +214,48 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
         this.domDataBroker = domDataBroker;
         this.domDataTreeService = domDataTreeService;
         this.distributedShardFactory = distributedShardFactory;
         this.domDataBroker = domDataBroker;
         this.domDataTreeService = domDataTreeService;
         this.distributedShardFactory = distributedShardFactory;
+        this.configDataStore = configDataStore;
+        this.actorSystem = actorSystemProvider.getActorSystem();
+
+        this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
+
+        domDataTreeChangeService =
+                (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
 
         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
 
         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
+
+        prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
+                bindingNormalizedNodeSerializer);
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<Void>> unregisterSingletonConstant() {
-        LOG.debug("unregister-singleton-constant");
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    public ListenableFuture<RpcResult<UnregisterSingletonConstantOutput>> unregisterSingletonConstant(
+            final UnregisterSingletonConstantInput input) {
+        LOG.info("In unregisterSingletonConstant");
 
         if (getSingletonConstantRegistration == null) {
 
         if (getSingletonConstantRegistration == null) {
-            LOG.debug("No get-singleton-constant registration present.");
-            final RpcError rpcError = RpcResultBuilder
-                    .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present.");
-            final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
-            return Futures.immediateFuture(result);
+            return RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withError(ErrorType.RPC, "data-missing",
+                    "No prior RPC was registered").buildFuture();
         }
 
         try {
             getSingletonConstantRegistration.close();
             getSingletonConstantRegistration = null;
 
         }
 
         try {
             getSingletonConstantRegistration.close();
             getSingletonConstantRegistration = null;
 
-            return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
-        } catch (final Exception e) {
-            LOG.debug("There was a problem closing the singleton constant service", e);
-            final RpcError rpcError = RpcResultBuilder
-                    .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant");
-            final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
-            return Futures.immediateFuture(result);
+            return RpcResultBuilder.success(new UnregisterSingletonConstantOutputBuilder().build()).buildFuture();
+        } catch (Exception e) {
+            String msg = "Error closing the singleton constant service";
+            LOG.error(msg, e);
+            return RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withError(
+                    ErrorType.APPLICATION, msg, e).buildFuture();
         }
     }
 
     @Override
         }
     }
 
     @Override
-    public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
-        LOG.debug("publish-notifications, input: {}", input);
+    public ListenableFuture<RpcResult<StartPublishNotificationsOutput>> startPublishNotifications(
+            final StartPublishNotificationsInput input) {
+        LOG.info("In startPublishNotifications - input: {}", input);
 
         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
                 input.getSeconds(), input.getNotificationsPerSecond());
 
         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
                 input.getSeconds(), input.getNotificationsPerSecond());
@@ -158,229 +264,306 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
 
         task.start();
 
 
         task.start();
 
-        return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+        return RpcResultBuilder.success(new StartPublishNotificationsOutputBuilder().build()).buildFuture();
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<Void>> subscribeDtcl() {
-        return null;
-    }
+    public ListenableFuture<RpcResult<SubscribeDtclOutput>> subscribeDtcl(final SubscribeDtclInput input) {
+        LOG.info("In subscribeDtcl - input: {}", input);
 
 
-    @Override
-    public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
-        LOG.debug("write-transactions, input: {}", input);
+        if (dtclReg != null) {
+            return RpcResultBuilder.<SubscribeDtclOutput>failed().withError(ErrorType.RPC,
+                "data-exists", "There is already a DataTreeChangeListener registered for id-ints").buildFuture();
+        }
 
 
-        final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
+        idIntsListener = new IdIntsListener();
 
 
-        final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
-        writeTransactionsHandler.start(settableFuture);
+        dtclReg = domDataTreeChangeService
+                .registerDataTreeChangeListener(
+                        new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
+                                CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
+                        idIntsListener);
 
 
-        return settableFuture;
+        return RpcResultBuilder.success(new SubscribeDtclOutputBuilder().build()).buildFuture();
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
-        return null;
+    public ListenableFuture<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
+        return WriteTransactionsHandler.start(domDataBroker, input);
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<Void>> becomeModuleLeader(BecomeModuleLeaderInput input) {
+    public ListenableFuture<RpcResult<IsClientAbortedOutput>> isClientAborted(final IsClientAbortedInput input) {
         return null;
     }
 
     @Override
         return null;
     }
 
     @Override
-    public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
+    public ListenableFuture<RpcResult<RemoveShardReplicaOutput>> removeShardReplica(
+            final RemoveShardReplicaInput input) {
         return null;
     }
 
     @Override
         return null;
     }
 
     @Override
-    public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
-
-        LOG.debug("subscribe-ynl, input: {}", input);
+    public ListenableFuture<RpcResult<SubscribeYnlOutput>> subscribeYnl(final SubscribeYnlInput input) {
+        LOG.info("In subscribeYnl - input: {}", input);
 
         if (ynlRegistrations.containsKey(input.getId())) {
 
         if (ynlRegistrations.containsKey(input.getId())) {
-            final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
-                    "There is already ynl listener registered for this id: " + input.getId());
-            return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+            return RpcResultBuilder.<SubscribeYnlOutput>failed().withError(ErrorType.RPC,
+                "data-exists", "There is already a listener registered for id: " + input.getId()).buildFuture();
         }
 
         ynlRegistrations.put(input.getId(),
                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
 
         }
 
         ynlRegistrations.put(input.getId(),
                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
 
-        return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+        return RpcResultBuilder.success(new SubscribeYnlOutputBuilder().build()).buildFuture();
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<Void>> becomePrefixLeader(BecomePrefixLeaderInput input) {
-        return null;
+    public ListenableFuture<RpcResult<RemovePrefixShardOutput>> removePrefixShard(final RemovePrefixShardInput input) {
+        LOG.info("In removePrefixShard - input: {}", input);
+
+        return prefixShardHandler.onRemovePrefixShard(input);
+    }
+
+    @Override
+    public ListenableFuture<RpcResult<BecomePrefixLeaderOutput>> becomePrefixLeader(
+            final BecomePrefixLeaderInput input) {
+        LOG.info("n becomePrefixLeader - input: {}", input);
+
+        return prefixLeaderHandler.makeLeaderLocal(input);
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
-        LOG.debug("unregister-bound-constant, {}", input);
+    public ListenableFuture<RpcResult<UnregisterBoundConstantOutput>> unregisterBoundConstant(
+            final UnregisterBoundConstantInput input) {
+        LOG.info("In unregisterBoundConstant - {}", input);
 
 
-        final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
+        final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
                 routedRegistrations.remove(input.getContext());
 
                 routedRegistrations.remove(input.getContext());
 
-        if (registration == null) {
-            LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
-            final RpcError rpcError = RpcResultBuilder
-                    .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
-            final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
-            return Futures.immediateFuture(result);
+        if (rpcRegistration == null) {
+            return RpcResultBuilder.<UnregisterBoundConstantOutput>failed().withError(
+                ErrorType.RPC, "data-missing", "No prior RPC was registered for " + input.getContext()).buildFuture();
         }
 
         }
 
-        registration.close();
-        return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+        rpcRegistration.close();
+        return RpcResultBuilder.success(new UnregisterBoundConstantOutputBuilder().build()).buildFuture();
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
-
-        LOG.debug("Received register-singleton-constant rpc, input: {}", input);
+    public ListenableFuture<RpcResult<RegisterSingletonConstantOutput>> registerSingletonConstant(
+            final RegisterSingletonConstantInput input) {
+        LOG.info("In registerSingletonConstant - input: {}", input);
 
         if (input.getConstant() == null) {
 
         if (input.getConstant() == null) {
-            final RpcError error = RpcResultBuilder.newError(
-                    ErrorType.RPC, "Invalid input.", "Constant value is null");
-            return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+            return RpcResultBuilder.<RegisterSingletonConstantOutput>failed().withError(
+                    ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
         }
 
         getSingletonConstantRegistration =
                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
 
         }
 
         getSingletonConstantRegistration =
                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
 
-        return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+        return RpcResultBuilder.success(new RegisterSingletonConstantOutputBuilder().build()).buildFuture();
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
+    public ListenableFuture<RpcResult<RegisterDefaultConstantOutput>> registerDefaultConstant(
+            final RegisterDefaultConstantInput input) {
         return null;
     }
 
     @Override
         return null;
     }
 
     @Override
-    public Future<RpcResult<Void>> unregisterConstant() {
+    public ListenableFuture<RpcResult<UnregisterConstantOutput>> unregisterConstant(
+            final UnregisterConstantInput input) {
+        LOG.info("In unregisterConstant");
 
         if (globalGetConstantRegistration == null) {
 
         if (globalGetConstantRegistration == null) {
-            final RpcError rpcError = RpcResultBuilder
-                    .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
-            final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
-            return Futures.immediateFuture(result);
+            return RpcResultBuilder.<UnregisterConstantOutput>failed().withError(
+                ErrorType.RPC, "data-missing", "No prior RPC was registered").buildFuture();
         }
 
         globalGetConstantRegistration.close();
         globalGetConstantRegistration = null;
 
         }
 
         globalGetConstantRegistration.close();
         globalGetConstantRegistration = null;
 
-        return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+        return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterConstantOutputBuilder().build()).build());
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
-        LOG.debug("unregister-flapping-singleton received.");
+    public ListenableFuture<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton(
+            final UnregisterFlappingSingletonInput input) {
+        LOG.info("In unregisterFlappingSingleton");
 
         if (flappingSingletonService == null) {
 
         if (flappingSingletonService == null) {
-            final RpcError rpcError = RpcResultBuilder
-                    .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present.");
-            final RpcResult<UnregisterFlappingSingletonOutput> result =
-                    RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
-            return Futures.immediateFuture(result);
+            return RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withError(
+                ErrorType.RPC, "data-missing", "No prior RPC was registered").buildFuture();
         }
 
         final long flapCount = flappingSingletonService.setInactive();
         flappingSingletonService = null;
 
         }
 
         final long flapCount = flappingSingletonService.setInactive();
         flappingSingletonService = null;
 
-        final UnregisterFlappingSingletonOutput output =
-                new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
-
-        return Futures.immediateFuture(RpcResultBuilder.success(output).build());
+        return RpcResultBuilder.success(new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build())
+                .buildFuture();
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
+    public ListenableFuture<RpcResult<AddShardReplicaOutput>> addShardReplica(final AddShardReplicaInput input) {
         return null;
     }
 
     @Override
         return null;
     }
 
     @Override
-    public Future<RpcResult<Void>> subscribeDdtl() {
-        return null;
+    public ListenableFuture<RpcResult<SubscribeDdtlOutput>> subscribeDdtl(final SubscribeDdtlInput input) {
+        LOG.info("In subscribeDdtl");
+
+        if (ddtlReg != null) {
+            return RpcResultBuilder.<SubscribeDdtlOutput>failed().withError(ErrorType.RPC,
+                "data-exists", "There is already a listener registered for id-ints").buildFuture();
+        }
+
+        idIntsDdtl = new IdIntsDOMDataTreeLIstener();
+
+        try {
+            ddtlReg = domDataTreeService.registerListener(idIntsDdtl,
+                    Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
+                            ProduceTransactionsHandler.ID_INT_YID)),
+                    true, Collections.emptyList());
+        } catch (DOMDataTreeLoopException e) {
+            LOG.error("Failed to register DOMDataTreeListener", e);
+            return RpcResultBuilder.<SubscribeDdtlOutput>failed().withError(
+                ErrorType.APPLICATION, "Failed to register DOMDataTreeListener", e).buildFuture();
+        }
+
+        return RpcResultBuilder.success(new SubscribeDdtlOutputBuilder().build()).buildFuture();
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
-        LOG.debug("register-bound-constant: {}", input);
+    public ListenableFuture<RpcResult<RegisterBoundConstantOutput>> registerBoundConstant(
+            final RegisterBoundConstantInput input) {
+        LOG.info("In registerBoundConstant - input: {}", input);
 
         if (input.getContext() == null) {
 
         if (input.getContext() == null) {
-            final RpcError error = RpcResultBuilder.newError(
-                    ErrorType.RPC, "Invalid input.", "Context value is null");
-            return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+            return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(
+                    ErrorType.RPC, "invalid-value", "Context value is null").buildFuture();
         }
 
         if (input.getConstant() == null) {
         }
 
         if (input.getConstant() == null) {
-            final RpcError error = RpcResultBuilder.newError(
-                    ErrorType.RPC, "Invalid input.", "Constant value is null");
-            return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+            return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(
+                    ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
         }
 
         if (routedRegistrations.containsKey(input.getContext())) {
         }
 
         if (routedRegistrations.containsKey(input.getContext())) {
-            final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
-                    "There is already a rpc registered for context: " + input.getContext());
-            return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+            return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(ErrorType.RPC,
+                "data-exists", "There is already an rpc registered for context: " + input.getContext()).buildFuture();
         }
 
         }
 
-        final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
+        final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
                         input.getConstant(), input.getContext());
 
                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
                         input.getConstant(), input.getContext());
 
-        routedRegistrations.put(input.getContext(), registration);
-        return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+        routedRegistrations.put(input.getContext(), rpcRegistration);
+        return RpcResultBuilder.success(new RegisterBoundConstantOutputBuilder().build()).buildFuture();
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<Void>> registerFlappingSingleton() {
-        LOG.debug("Received register-flapping-singleton.");
+    public ListenableFuture<RpcResult<RegisterFlappingSingletonOutput>> registerFlappingSingleton(
+            final RegisterFlappingSingletonInput input) {
+        LOG.info("In registerFlappingSingleton");
 
         if (flappingSingletonService != null) {
 
         if (flappingSingletonService != null) {
-            final RpcError error = RpcResultBuilder.newError(
-                    ErrorType.RPC, "Registration present.", "flappin-singleton already registered");
-            return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+            return RpcResultBuilder.<RegisterFlappingSingletonOutput>failed().withError(ErrorType.RPC,
+                "data-exists", "There is already an rpc registered").buildFuture();
         }
 
         flappingSingletonService = new FlappingSingletonService(singletonService);
 
         }
 
         flappingSingletonService = new FlappingSingletonService(singletonService);
 
-        return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+        return RpcResultBuilder.success(new RegisterFlappingSingletonOutputBuilder().build()).buildFuture();
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
-        return null;
+    public ListenableFuture<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl(final UnsubscribeDtclInput input) {
+        LOG.info("In unsubscribeDtcl");
+
+        if (idIntsListener == null || dtclReg == null) {
+            return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(
+                    ErrorType.RPC, "data-missing", "No prior listener was registered").buildFuture();
+        }
+
+        long timeout = 120L;
+        try {
+            idIntsListener.tryFinishProcessing().get(timeout, TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            LOG.error("Unable to finish notification processing", e);
+            return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION,
+                    "Unable to finish notification processing in " + timeout + " seconds", e).buildFuture();
+        }
+
+        dtclReg.close();
+        dtclReg = null;
+
+        if (!idIntsListener.hasTriggered()) {
+            return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION, "operation-failed",
+                    "id-ints listener has not received any notifications.").buildFuture();
+        }
+
+        final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
+        try {
+            final Optional<NormalizedNode<?, ?>> readResult =
+                    rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).get();
+
+            if (!readResult.isPresent()) {
+                return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION, "data-missing",
+                        "No data read from id-ints list").buildFuture();
+            }
+
+            final boolean nodesEqual = idIntsListener.checkEqual(readResult.get());
+            if (!nodesEqual) {
+                LOG.error("Final read of id-int does not match IdIntsListener's copy. {}",
+                        idIntsListener.diffWithLocalCopy(readResult.get()));
+            }
+
+            return RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder().setCopyMatches(nodesEqual))
+                    .buildFuture();
+
+        } catch (final InterruptedException | ExecutionException e) {
+            LOG.error("Final read of id-ints failed", e);
+            return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION,
+                    "Final read of id-ints failed", e).buildFuture();
+        }
+    }
+
+    @Override
+    public ListenableFuture<RpcResult<CreatePrefixShardOutput>> createPrefixShard(final CreatePrefixShardInput input) {
+        LOG.info("In createPrefixShard - input: {}", input);
+
+        return prefixShardHandler.onCreatePrefixShard(input);
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<Void>> deconfigureIdIntsShard() {
+    public ListenableFuture<RpcResult<DeconfigureIdIntsShardOutput>> deconfigureIdIntsShard(
+            final DeconfigureIdIntsShardInput input) {
         return null;
     }
 
     @Override
         return null;
     }
 
     @Override
-    public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
-        LOG.debug("Received unsubscribe-ynl, input: {}", input);
+    public ListenableFuture<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
+        LOG.info("In unsubscribeYnl - input: {}", input);
 
         if (!ynlRegistrations.containsKey(input.getId())) {
 
         if (!ynlRegistrations.containsKey(input.getId())) {
-            final RpcError rpcError = RpcResultBuilder
-                    .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered.");
-            final RpcResult<UnsubscribeYnlOutput> result =
-                    RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
-            return Futures.immediateFuture(result);
+            return RpcResultBuilder.<UnsubscribeYnlOutput>failed().withError(
+                ErrorType.RPC, "data-missing", "No prior listener was registered for " + input.getId()).buildFuture();
         }
 
         }
 
-        final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
-        final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
+        final ListenerRegistration<YnlListener> reg = ynlRegistrations.remove(input.getId());
+        final UnsubscribeYnlOutput output = reg.getInstance().getOutput();
 
 
-        registration.close();
+        reg.close();
 
 
-        return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
+        return RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).buildFuture();
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
+    public ListenableFuture<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
             final CheckPublishNotificationsInput input) {
             final CheckPublishNotificationsInput input) {
+        LOG.info("In checkPublishNotifications - input: {}", input);
 
         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
 
 
         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
 
@@ -393,59 +576,187 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
 
         if (task.getLastError() != null) {
                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
 
         if (task.getLastError() != null) {
-            final StringWriter sw = new StringWriter();
-            final PrintWriter pw = new PrintWriter(sw);
-            task.getLastError().printStackTrace(pw);
-            checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
+            LOG.error("Last error for {}", task, task.getLastError());
+            checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString());
         }
 
         final CheckPublishNotificationsOutput output =
                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
 
         }
 
         final CheckPublishNotificationsOutput output =
                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
 
-        return Futures.immediateFuture(RpcResultBuilder.success(output).build());
+        return RpcResultBuilder.success(output).buildFuture();
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
-        LOG.debug("producer-transactions, input: {}", input);
-
-        final ProduceTransactionsHandler handler =
-                new ProduceTransactionsHandler(domDataTreeService, input);
+    public ListenableFuture<RpcResult<ProduceTransactionsOutput>> produceTransactions(
+            final ProduceTransactionsInput input) {
+        LOG.info("In produceTransactions - input: {}", input);
+        return ProduceTransactionsHandler.start(domDataTreeService, input);
+    }
 
 
-        final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
-        handler.start(settableFuture);
+    @Override
+    public ListenableFuture<RpcResult<ShutdownShardReplicaOutput>> shutdownShardReplica(
+            final ShutdownShardReplicaInput input) {
+        LOG.info("In shutdownShardReplica - input: {}", input);
+
+        final String shardName = input.getShardName();
+        if (Strings.isNullOrEmpty(shardName)) {
+            return RpcResultBuilder.<ShutdownShardReplicaOutput>failed().withError(ErrorType.RPC, "bad-element",
+                shardName + "is not a valid shard name").buildFuture();
+        }
 
 
-        return settableFuture;
+        return shutdownShardGracefully(shardName, new ShutdownShardReplicaOutputBuilder().build());
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
+    public ListenableFuture<RpcResult<ShutdownPrefixShardReplicaOutput>> shutdownPrefixShardReplica(
+            final ShutdownPrefixShardReplicaInput input) {
+        LOG.info("shutdownPrefixShardReplica - input: {}", input);
+
+        final InstanceIdentifier<?> shardPrefix = input.getPrefix();
+
+        if (shardPrefix == null) {
+            return RpcResultBuilder.<ShutdownPrefixShardReplicaOutput>failed().withError(ErrorType.RPC, "bad-element",
+                    "A valid shard prefix must be specified").buildFuture();
+        }
+
+        final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
+        final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
+
+        return shutdownShardGracefully(cleanPrefixShardName, new ShutdownPrefixShardReplicaOutputBuilder().build());
+    }
+
+    private <T> SettableFuture<RpcResult<T>> shutdownShardGracefully(final String shardName, final T success) {
+        final SettableFuture<RpcResult<T>> rpcResult = SettableFuture.create();
+        final ActorContext context = configDataStore.getActorContext();
+
+        long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
+                .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
+        final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
+        final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
+
+        context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
+            @Override
+            public void onComplete(final Throwable throwable, final ActorRef actorRef) {
+                if (throwable != null) {
+                    shutdownShardAsk.failure(throwable);
+                } else {
+                    shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
+                }
+            }
+        }, context.getClientDispatcher());
+
+        shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
+            @Override
+            public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) {
+                if (throwable != null) {
+                    final RpcResult<T> failedResult = RpcResultBuilder.<T>failed()
+                            .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
+                    rpcResult.set(failedResult);
+                } else {
+                    // according to Patterns.gracefulStop API, we don't have to
+                    // check value of gracefulStopResult
+                    rpcResult.set(RpcResultBuilder.success(success).build());
+                }
+            }
+        }, context.getClientDispatcher());
+        return rpcResult;
+    }
 
 
-        LOG.debug("Received register-constant rpc, input: {}", input);
+    @Override
+    public ListenableFuture<RpcResult<RegisterConstantOutput>> registerConstant(final RegisterConstantInput input) {
+        LOG.info("In registerConstant - input: {}", input);
 
         if (input.getConstant() == null) {
 
         if (input.getConstant() == null) {
-            final RpcError error = RpcResultBuilder.newError(
-                    ErrorType.RPC, "Invalid input.", "Constant value is null");
-            return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+            return RpcResultBuilder.<RegisterConstantOutput>failed().withError(
+                    ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
         }
 
         if (globalGetConstantRegistration != null) {
         }
 
         if (globalGetConstantRegistration != null) {
-            final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
-                    "There is already a get-constant rpc registered.");
-            return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+            return RpcResultBuilder.<RegisterConstantOutput>failed().withError(ErrorType.RPC,
+                    "data-exists", "There is already an rpc registered").buildFuture();
         }
 
         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
         }
 
         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
-        return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
+        return RpcResultBuilder.success(new RegisterConstantOutputBuilder().build()).buildFuture();
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<Void>> unregisterDefaultConstant() {
+    public ListenableFuture<RpcResult<UnregisterDefaultConstantOutput>> unregisterDefaultConstant(
+            final UnregisterDefaultConstantInput input) {
         return null;
     }
 
     @Override
         return null;
     }
 
     @Override
-    public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
-        return null;
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    public ListenableFuture<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl(final UnsubscribeDdtlInput input) {
+        LOG.info("In unsubscribeDdtl");
+
+        if (idIntsDdtl == null || ddtlReg == null) {
+            return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(
+                    ErrorType.RPC, "data-missing", "No prior listener was registered").buildFuture();
+        }
+
+        long timeout = 120L;
+        try {
+            idIntsDdtl.tryFinishProcessing().get(timeout, TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            LOG.error("Unable to finish notification processing", e);
+            return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
+                    "Unable to finish notification processing in " + timeout + " seconds", e).buildFuture();
+        }
+
+        ddtlReg.close();
+        ddtlReg = null;
+
+        if (!idIntsDdtl.hasTriggered()) {
+            return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
+                    "No notification received.", "id-ints listener has not received any notifications").buildFuture();
+        }
+
+        final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
+        LOG.debug("Creating distributed datastore client for shard {}", shardName);
+
+        final ActorContext actorContext = configDataStore.getActorContext();
+        final Props distributedDataStoreClientProps =
+                SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
+                        "Shard-" + shardName, actorContext, shardName);
+
+        final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
+        final DataStoreClient distributedDataStoreClient;
+        try {
+            distributedDataStoreClient = SimpleDataStoreClientActor
+                    .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
+        } catch (RuntimeException e) {
+            LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
+            clientActor.tell(PoisonPill.getInstance(), noSender());
+            return RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
+                    .withError(ErrorType.APPLICATION, "Unable to create DataStoreClient for read", e).buildFuture();
+        }
+
+        final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
+        final ClientTransaction tx = localHistory.createTransaction();
+        final ListenableFuture<java.util.Optional<NormalizedNode<?, ?>>> read =
+                tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
+
+        tx.abort();
+        localHistory.close();
+        try {
+            final java.util.Optional<NormalizedNode<?, ?>> optional = read.get();
+            if (!optional.isPresent()) {
+                return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
+                        "data-missing", "Final read from id-ints is empty").buildFuture();
+            }
+
+            return RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder().setCopyMatches(
+                    idIntsDdtl.checkEqual(optional.get()))).buildFuture();
+
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Unable to read data to verify ddtl data", e);
+            return RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
+                    .withError(ErrorType.APPLICATION, "Final read from id-ints failed", e).buildFuture();
+        } finally {
+            distributedDataStoreClient.close();
+            clientActor.tell(PoisonPill.getInstance(), noSender());
+        }
     }
 }
     }
 }