Remove DOMDataTreeProducer-related classes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / ShardedDataTreeActor.java
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java
deleted file mode 100644 (file)
index 52c3d25..0000000
+++ /dev/null
@@ -1,829 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding;
-
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.actor.Status;
-import akka.actor.Status.Success;
-import akka.cluster.ClusterEvent;
-import akka.cluster.ClusterEvent.MemberExited;
-import akka.cluster.ClusterEvent.MemberRemoved;
-import akka.cluster.ClusterEvent.MemberUp;
-import akka.cluster.ClusterEvent.MemberWeaklyUp;
-import akka.cluster.ClusterEvent.ReachableMember;
-import akka.cluster.ClusterEvent.UnreachableMember;
-import akka.cluster.Member;
-import akka.dispatch.OnComplete;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
-import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
-import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
-import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
-import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
-import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard;
-import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
-import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
-import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
-import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup;
-import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
-import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
-import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
-import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
-import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
-import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
-import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.compat.java8.FutureConverters;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote
- * nodes of newly open producers/shards on the local node.
- */
-@Deprecated(forRemoval = true)
-public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ShardedDataTreeActor.class);
-
-    private static final String PERSISTENCE_ID = "sharding-service-actor";
-    private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
-
-    static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS);
-
-    private final DistributedShardedDOMDataTree shardingService;
-    private final ActorSystem actorSystem;
-    private final ClusterWrapper clusterWrapper;
-    // helper actorContext used only for static calls to executeAsync etc
-    // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
-    private final ActorUtils actorUtils;
-    private final ShardingServiceAddressResolver resolver;
-    private final DistributedDataStoreInterface distributedConfigDatastore;
-    private final DistributedDataStoreInterface distributedOperDatastore;
-    private final int lookupTaskMaxRetries;
-
-    private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
-
-    ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
-        LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
-
-        shardingService = builder.getShardingService();
-        actorSystem = builder.getActorSystem();
-        clusterWrapper = builder.getClusterWrapper();
-        distributedConfigDatastore = builder.getDistributedConfigDatastore();
-        distributedOperDatastore = builder.getDistributedOperDatastore();
-        lookupTaskMaxRetries = builder.getLookupTaskMaxRetries();
-        actorUtils = distributedConfigDatastore.getActorUtils();
-        resolver = new ShardingServiceAddressResolver(
-                DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
-
-        clusterWrapper.subscribeToMemberEvents(self());
-    }
-
-    @Override
-    public void preStart() {
-    }
-
-    @Override
-    protected void handleRecover(final Object message) {
-        LOG.debug("Received a recover message {}", message);
-    }
-
-    @Override
-    protected void handleCommand(final Object message) {
-        LOG.debug("{} : Received {}", clusterWrapper.getCurrentMemberName(), message);
-        if (message instanceof ClusterEvent.MemberUp) {
-            memberUp((ClusterEvent.MemberUp) message);
-        } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
-            memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
-        } else if (message instanceof ClusterEvent.MemberExited) {
-            memberExited((ClusterEvent.MemberExited) message);
-        } else if (message instanceof ClusterEvent.MemberRemoved) {
-            memberRemoved((ClusterEvent.MemberRemoved) message);
-        } else if (message instanceof ClusterEvent.UnreachableMember) {
-            memberUnreachable((ClusterEvent.UnreachableMember) message);
-        } else if (message instanceof ClusterEvent.ReachableMember) {
-            memberReachable((ClusterEvent.ReachableMember) message);
-        } else if (message instanceof ProducerCreated) {
-            onProducerCreated((ProducerCreated) message);
-        } else if (message instanceof NotifyProducerCreated) {
-            onNotifyProducerCreated((NotifyProducerCreated) message);
-        } else if (message instanceof ProducerRemoved) {
-            onProducerRemoved((ProducerRemoved) message);
-        } else if (message instanceof NotifyProducerRemoved) {
-            onNotifyProducerRemoved((NotifyProducerRemoved) message);
-        } else if (message instanceof PrefixShardCreated) {
-            onPrefixShardCreated((PrefixShardCreated) message);
-        } else if (message instanceof LookupPrefixShard) {
-            onLookupPrefixShard((LookupPrefixShard) message);
-        } else if (message instanceof PrefixShardRemovalLookup) {
-            onPrefixShardRemovalLookup((PrefixShardRemovalLookup) message);
-        } else if (message instanceof PrefixShardRemoved) {
-            onPrefixShardRemoved((PrefixShardRemoved) message);
-        } else if (message instanceof StartConfigShardLookup) {
-            onStartConfigShardLookup((StartConfigShardLookup) message);
-        }
-    }
-
-    @Override
-    public String persistenceId() {
-        return PERSISTENCE_ID;
-    }
-
-    private void memberUp(final MemberUp message) {
-        final MemberName memberName = memberToName(message.member());
-
-        LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
-                message.member().address());
-
-        resolver.addPeerAddress(memberName, message.member().address());
-    }
-
-    private void memberWeaklyUp(final MemberWeaklyUp message) {
-        final MemberName memberName = memberToName(message.member());
-
-        LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
-                message.member().address());
-
-        resolver.addPeerAddress(memberName, message.member().address());
-    }
-
-    private void memberExited(final MemberExited message) {
-        final MemberName memberName = memberToName(message.member());
-
-        LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
-                message.member().address());
-
-        resolver.removePeerAddress(memberName);
-    }
-
-    private void memberRemoved(final MemberRemoved message) {
-        final MemberName memberName = memberToName(message.member());
-
-        LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
-                message.member().address());
-
-        resolver.removePeerAddress(memberName);
-    }
-
-    private void memberUnreachable(final UnreachableMember message) {
-        final MemberName memberName = memberToName(message.member());
-        LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
-
-        resolver.removePeerAddress(memberName);
-    }
-
-    private void memberReachable(final ReachableMember message) {
-        final MemberName memberName = memberToName(message.member());
-        LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
-
-        resolver.addPeerAddress(memberName, message.member().address());
-    }
-
-    private void onProducerCreated(final ProducerCreated message) {
-        LOG.debug("Received ProducerCreated: {}", message);
-
-        // fastpath if we have no peers
-        if (resolver.getShardingServicePeerActorAddresses().isEmpty()) {
-            getSender().tell(new Status.Success(null), ActorRef.noSender());
-        }
-
-        final ActorRef sender = getSender();
-        final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
-
-        final List<CompletableFuture<Object>> futures = new ArrayList<>();
-
-        for (final String address : resolver.getShardingServicePeerActorAddresses()) {
-            final ActorSelection actorSelection = actorSystem.actorSelection(address);
-            futures.add(
-                    FutureConverters.toJava(
-                            actorUtils.executeOperationAsync(
-                                    actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT))
-                    .toCompletableFuture());
-        }
-
-        final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
-                futures.toArray(new CompletableFuture[futures.size()]));
-
-        combinedFuture
-                .thenRun(() -> sender.tell(new Success(null), ActorRef.noSender()))
-                .exceptionally(throwable -> {
-                    sender.tell(new Status.Failure(throwable), self());
-                    return null;
-                });
-    }
-
-    private void onNotifyProducerCreated(final NotifyProducerCreated message) {
-        LOG.debug("Received NotifyProducerCreated: {}", message);
-
-        final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
-
-        try {
-            final ActorProducerRegistration registration =
-                    new ActorProducerRegistration(shardingService.localCreateProducer(subtrees), subtrees);
-            subtrees.forEach(id -> idToProducer.put(id, registration));
-            sender().tell(new Status.Success(null), self());
-        } catch (final IllegalArgumentException e) {
-            sender().tell(new Status.Failure(e), getSelf());
-        }
-    }
-
-    private void onProducerRemoved(final ProducerRemoved message) {
-        LOG.debug("Received ProducerRemoved: {}", message);
-
-        final List<CompletableFuture<Object>> futures = new ArrayList<>();
-
-        for (final String address : resolver.getShardingServicePeerActorAddresses()) {
-            final ActorSelection selection = actorSystem.actorSelection(address);
-
-            futures.add(FutureConverters.toJava(
-                    actorUtils.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees())))
-                    .toCompletableFuture());
-        }
-
-        final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
-                futures.toArray(new CompletableFuture[futures.size()]));
-
-        final ActorRef respondTo = getSender();
-
-        combinedFuture
-                .thenRun(() -> respondTo.tell(new Status.Success(null), self()))
-                .exceptionally(e -> {
-                    respondTo.tell(new Status.Failure(null), self());
-                    return null;
-                });
-
-    }
-
-    private void onNotifyProducerRemoved(final NotifyProducerRemoved message) {
-        LOG.debug("Received NotifyProducerRemoved: {}", message);
-
-        final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next());
-        if (registration == null) {
-            LOG.warn("The notification contained a path on which no producer is registered, throwing away");
-            getSender().tell(new Status.Success(null), ActorRef.noSender());
-            return;
-        }
-
-        try {
-            registration.close();
-            getSender().tell(new Status.Success(null), ActorRef.noSender());
-        } catch (final DOMDataTreeProducerException e) {
-            LOG.error("Unable to close producer", e);
-            getSender().tell(new Status.Failure(e), ActorRef.noSender());
-        }
-    }
-
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    private void onLookupPrefixShard(final LookupPrefixShard message) {
-        LOG.debug("Member: {}, Received LookupPrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
-
-        final DOMDataTreeIdentifier prefix = message.getPrefix();
-
-        final ActorUtils utils = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION
-                        ? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils();
-
-        // schedule a notification task for the reply
-        actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
-                new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper,
-                        utils, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher());
-    }
-
-    private void onPrefixShardCreated(final PrefixShardCreated message) {
-        LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message);
-
-        final PrefixShardConfiguration config = message.getConfiguration();
-
-        shardingService.resolveShardAdditions(Collections.singleton(config.getPrefix()));
-    }
-
-    private void onPrefixShardRemovalLookup(final PrefixShardRemovalLookup message) {
-        LOG.debug("Member: {}, Received PrefixShardRemovalLookup: {}", clusterWrapper.getCurrentMemberName(), message);
-
-        final ShardRemovalLookupTask removalTask =
-                new ShardRemovalLookupTask(actorSystem, getSender(),
-                        actorUtils, message.getPrefix(), lookupTaskMaxRetries);
-
-        actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher());
-    }
-
-    private void onPrefixShardRemoved(final PrefixShardRemoved message) {
-        LOG.debug("Received PrefixShardRemoved: {}", message);
-
-        shardingService.resolveShardRemovals(Collections.singleton(message.getPrefix()));
-    }
-
-    private void onStartConfigShardLookup(final StartConfigShardLookup message) {
-        LOG.debug("Received StartConfigShardLookup: {}", message);
-
-        final ActorUtils context =
-                message.getType().equals(LogicalDatastoreType.CONFIGURATION)
-                        ? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils();
-
-        // schedule a notification task for the reply
-        actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
-                new ConfigShardLookupTask(
-                        actorSystem, getSender(), context, message, lookupTaskMaxRetries),
-                actorSystem.dispatcher());
-    }
-
-    private static MemberName memberToName(final Member member) {
-        return MemberName.forName(member.roles().iterator().next());
-    }
-
-    private class ActorProducerRegistration {
-
-        private final DOMDataTreeProducer producer;
-        private final Collection<DOMDataTreeIdentifier> subtrees;
-
-        ActorProducerRegistration(final DOMDataTreeProducer producer,
-                                  final Collection<DOMDataTreeIdentifier> subtrees) {
-            this.producer = producer;
-            this.subtrees = subtrees;
-        }
-
-        void close() throws DOMDataTreeProducerException {
-            producer.close();
-            subtrees.forEach(idToProducer::remove);
-        }
-    }
-
-    private static class ShardFrontendRegistration extends
-            AbstractObjectRegistration<ListenerRegistration<DistributedShardFrontend>> {
-
-        private final ActorRef clientActor;
-        private final ListenerRegistration<DistributedShardFrontend> shardRegistration;
-
-        ShardFrontendRegistration(final ActorRef clientActor,
-                                  final ListenerRegistration<DistributedShardFrontend> shardRegistration) {
-            super(shardRegistration);
-            this.clientActor = clientActor;
-            this.shardRegistration = shardRegistration;
-        }
-
-        @Override
-        protected void removeRegistration() {
-            shardRegistration.close();
-            clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
-        }
-    }
-
-    /**
-     * Handles the lookup step of cds shard creation once the configuration is updated.
-     */
-    private static class ShardCreationLookupTask extends LookupTask {
-
-        private final ActorSystem system;
-        private final ActorRef replyTo;
-        private final ClusterWrapper clusterWrapper;
-        private final ActorUtils context;
-        private final DistributedShardedDOMDataTree shardingService;
-        private final DOMDataTreeIdentifier toLookup;
-        private final int lookupMaxRetries;
-
-        ShardCreationLookupTask(final ActorSystem system,
-                                final ActorRef replyTo,
-                                final ClusterWrapper clusterWrapper,
-                                final ActorUtils context,
-                                final DistributedShardedDOMDataTree shardingService,
-                                final DOMDataTreeIdentifier toLookup,
-                                final int lookupMaxRetries) {
-            super(replyTo, lookupMaxRetries);
-            this.system = system;
-            this.replyTo = replyTo;
-            this.clusterWrapper = clusterWrapper;
-            this.context = context;
-            this.shardingService = shardingService;
-            this.toLookup = toLookup;
-            this.lookupMaxRetries = lookupMaxRetries;
-        }
-
-        @Override
-        public void run() {
-            final Future<ActorRef> localShardFuture =
-                    context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
-
-            localShardFuture.onComplete(new OnComplete<ActorRef>() {
-                @Override
-                public void onComplete(final Throwable throwable, final ActorRef actorRef) {
-                    if (throwable != null) {
-                        tryReschedule(throwable);
-                    } else {
-                        LOG.debug("Local backend for shard[{}] lookup successful, starting leader lookup..", toLookup);
-
-                        system.scheduler().scheduleOnce(
-                                SHARD_LOOKUP_TASK_INTERVAL,
-                                new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef,
-                                        shardingService, toLookup, lookupMaxRetries),
-                                system.dispatcher());
-                    }
-                }
-            }, system.dispatcher());
-        }
-
-        @Override
-        void reschedule(final int retries) {
-            LOG.debug("Local backend for shard[{}] not found, try: {}, rescheduling..", toLookup, retries);
-            system.scheduler().scheduleOnce(
-                    SHARD_LOOKUP_TASK_INTERVAL, ShardCreationLookupTask.this, system.dispatcher());
-        }
-    }
-
-    /**
-     * Handles the readiness step by waiting for a leader of the created shard.
-     */
-    private static class ShardLeaderLookupTask extends LookupTask {
-
-        private final ActorSystem system;
-        private final ActorRef replyTo;
-        private final ActorUtils context;
-        private final ClusterWrapper clusterWrapper;
-        private final ActorRef shard;
-        private final DistributedShardedDOMDataTree shardingService;
-        private final DOMDataTreeIdentifier toLookup;
-        private final int lookupMaxRetries;
-
-        ShardLeaderLookupTask(final ActorSystem system,
-                              final ActorRef replyTo,
-                              final ActorUtils context,
-                              final ClusterWrapper clusterWrapper,
-                              final ActorRef shard,
-                              final DistributedShardedDOMDataTree shardingService,
-                              final DOMDataTreeIdentifier toLookup,
-                              final int lookupMaxRetries) {
-            super(replyTo, lookupMaxRetries);
-            this.system = system;
-            this.replyTo = replyTo;
-            this.context = context;
-            this.clusterWrapper = clusterWrapper;
-            this.shard = shard;
-            this.shardingService = shardingService;
-            this.toLookup = toLookup;
-            this.lookupMaxRetries = lookupMaxRetries;
-        }
-
-        @Override
-        public void run() {
-
-            final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
-
-            ask.onComplete(new OnComplete<>() {
-                @Override
-                public void onComplete(final Throwable throwable, final Object findLeaderReply) {
-                    if (throwable != null) {
-                        tryReschedule(throwable);
-                    } else {
-                        final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
-                        final Optional<String> leaderActor = findLeader.getLeaderActor();
-                        if (leaderActor.isPresent()) {
-                            // leader is found, backend seems ready, check if the frontend is ready
-                            LOG.debug("{} - Leader for shard[{}] backend ready, starting frontend lookup..",
-                                    clusterWrapper.getCurrentMemberName(), toLookup);
-                            system.scheduler().scheduleOnce(
-                                    SHARD_LOOKUP_TASK_INTERVAL,
-                                    new FrontendLookupTask(
-                                            system, replyTo, shardingService, toLookup, lookupMaxRetries),
-                                    system.dispatcher());
-                        } else {
-                            tryReschedule(null);
-                        }
-                    }
-                }
-            }, system.dispatcher());
-
-        }
-
-        @Override
-        void reschedule(final int retries) {
-            LOG.debug("{} - Leader for shard[{}] backend not found on try: {}, retrying..",
-                    clusterWrapper.getCurrentMemberName(), toLookup, retries);
-            system.scheduler().scheduleOnce(
-                    SHARD_LOOKUP_TASK_INTERVAL, ShardLeaderLookupTask.this, system.dispatcher());
-        }
-    }
-
-    /**
-     * After backend is ready this handles the last step - checking if we have a frontend shard for the backend,
-     * once this completes(which should be ready by the time the backend is created, this is just a sanity check in
-     * case they race), the future for the cds shard creation is completed and the shard is ready for use.
-     */
-    private static final class FrontendLookupTask extends LookupTask {
-
-        private final ActorSystem system;
-        private final ActorRef replyTo;
-        private final DistributedShardedDOMDataTree shardingService;
-        private final DOMDataTreeIdentifier toLookup;
-
-        FrontendLookupTask(final ActorSystem system,
-                           final ActorRef replyTo,
-                           final DistributedShardedDOMDataTree shardingService,
-                           final DOMDataTreeIdentifier toLookup,
-                           final int lookupMaxRetries) {
-            super(replyTo, lookupMaxRetries);
-            this.system = system;
-            this.replyTo = replyTo;
-            this.shardingService = shardingService;
-            this.toLookup = toLookup;
-        }
-
-        @Override
-        public void run() {
-            final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> entry =
-                    shardingService.lookupShardFrontend(toLookup);
-
-            if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) {
-                replyTo.tell(new Success(null), ActorRef.noSender());
-            } else {
-                tryReschedule(null);
-            }
-        }
-
-        private boolean tableEntryIdCheck(final DOMDataTreePrefixTableEntry<?> entry,
-                                          final DOMDataTreeIdentifier prefix) {
-            if (entry == null) {
-                return false;
-            }
-
-            if (YangInstanceIdentifier.empty().equals(prefix.getRootIdentifier())) {
-                return true;
-            }
-
-            if (entry.getIdentifier().equals(toLookup.getRootIdentifier().getLastPathArgument())) {
-                return true;
-            }
-
-            return false;
-        }
-
-        @Override
-        void reschedule(final int retries) {
-            LOG.debug("Frontend for shard[{}] not found on try: {}, retrying..", toLookup, retries);
-            system.scheduler().scheduleOnce(
-                    SHARD_LOOKUP_TASK_INTERVAL, FrontendLookupTask.this, system.dispatcher());
-        }
-    }
-
-    /**
-     * Task that is run once a cds shard registration is closed and completes once the backend shard is removed from the
-     * configuration.
-     */
-    private static class ShardRemovalLookupTask extends LookupTask {
-
-        private final ActorSystem system;
-        private final ActorRef replyTo;
-        private final ActorUtils context;
-        private final DOMDataTreeIdentifier toLookup;
-
-        ShardRemovalLookupTask(final ActorSystem system,
-                               final ActorRef replyTo,
-                               final ActorUtils context,
-                               final DOMDataTreeIdentifier toLookup,
-                               final int lookupMaxRetries) {
-            super(replyTo, lookupMaxRetries);
-            this.system = system;
-            this.replyTo = replyTo;
-            this.context = context;
-            this.toLookup = toLookup;
-        }
-
-        @Override
-        public void run() {
-            final Future<ActorRef> localShardFuture =
-                    context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
-
-            localShardFuture.onComplete(new OnComplete<ActorRef>() {
-                @Override
-                public void onComplete(final Throwable throwable, final ActorRef actorRef) {
-                    if (throwable != null) {
-                        //TODO Shouldn't we check why findLocalShard failed?
-                        LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future",
-                                toLookup);
-                        replyTo.tell(new Success(null), ActorRef.noSender());
-                    } else {
-                        tryReschedule(null);
-                    }
-                }
-            }, system.dispatcher());
-        }
-
-        @Override
-        void reschedule(final int retries) {
-            LOG.debug("Backend shard[{}] removal lookup failed, shard is still present, try: {}, rescheduling..",
-                    toLookup, retries);
-            system.scheduler().scheduleOnce(
-                    SHARD_LOOKUP_TASK_INTERVAL, ShardRemovalLookupTask.this, system.dispatcher());
-        }
-    }
-
-    /**
-     * Task for handling the lookup of the backend for the configuration shard.
-     */
-    private static class ConfigShardLookupTask extends LookupTask {
-
-        private final ActorSystem system;
-        private final ActorRef replyTo;
-        private final ActorUtils context;
-
-        ConfigShardLookupTask(final ActorSystem system,
-                              final ActorRef replyTo,
-                              final ActorUtils context,
-                              final StartConfigShardLookup message,
-                              final int lookupMaxRetries) {
-            super(replyTo, lookupMaxRetries);
-            this.system = system;
-            this.replyTo = replyTo;
-            this.context = context;
-        }
-
-        @Override
-        void reschedule(final int retries) {
-            LOG.debug("Local backend for prefix configuration shard not found, try: {}, rescheduling..", retries);
-            system.scheduler().scheduleOnce(
-                    SHARD_LOOKUP_TASK_INTERVAL, ConfigShardLookupTask.this, system.dispatcher());
-        }
-
-        @Override
-        public void run() {
-            final Optional<ActorRef> localShard =
-                    context.findLocalShard(ClusterUtils.PREFIX_CONFIG_SHARD_ID);
-
-            if (!localShard.isPresent()) {
-                tryReschedule(null);
-            } else {
-                LOG.debug("Local backend for prefix configuration shard lookup successful");
-                replyTo.tell(new Status.Success(null), ActorRef.noSender());
-            }
-        }
-    }
-
-    /**
-     * Task for handling the readiness state of the config shard. Reports success once the leader is elected.
-     */
-    private static class ConfigShardReadinessTask extends LookupTask {
-
-        private final ActorSystem system;
-        private final ActorRef replyTo;
-        private final ActorUtils context;
-        private final ClusterWrapper clusterWrapper;
-        private final ActorRef shard;
-
-        ConfigShardReadinessTask(final ActorSystem system,
-                                 final ActorRef replyTo,
-                                 final ActorUtils context,
-                                 final ClusterWrapper clusterWrapper,
-                                 final ActorRef shard,
-                                 final int lookupMaxRetries) {
-            super(replyTo, lookupMaxRetries);
-            this.system = system;
-            this.replyTo = replyTo;
-            this.context = context;
-            this.clusterWrapper = clusterWrapper;
-            this.shard = shard;
-        }
-
-        @Override
-        void reschedule(final int retries) {
-            LOG.debug("{} - Leader for config shard not found on try: {}, retrying..",
-                    clusterWrapper.getCurrentMemberName(), retries);
-            system.scheduler().scheduleOnce(
-                    SHARD_LOOKUP_TASK_INTERVAL, ConfigShardReadinessTask.this, system.dispatcher());
-        }
-
-        @Override
-        public void run() {
-            final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
-
-            ask.onComplete(new OnComplete<>() {
-                @Override
-                public void onComplete(final Throwable throwable, final Object findLeaderReply) {
-                    if (throwable != null) {
-                        tryReschedule(throwable);
-                    } else {
-                        final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
-                        final Optional<String> leaderActor = findLeader.getLeaderActor();
-                        if (leaderActor.isPresent()) {
-                            // leader is found, backend seems ready, check if the frontend is ready
-                            LOG.debug("{} - Leader for config shard is ready. Ending lookup.",
-                                    clusterWrapper.getCurrentMemberName());
-                            replyTo.tell(new Status.Success(null), ActorRef.noSender());
-                        } else {
-                            tryReschedule(null);
-                        }
-                    }
-                }
-            }, system.dispatcher());
-        }
-    }
-
-    public static class ShardedDataTreeActorCreator {
-
-        private DistributedShardedDOMDataTree shardingService;
-        private DistributedDataStoreInterface distributedConfigDatastore;
-        private DistributedDataStoreInterface distributedOperDatastore;
-        private ActorSystem actorSystem;
-        private ClusterWrapper cluster;
-        private int maxRetries;
-
-        public DistributedShardedDOMDataTree getShardingService() {
-            return shardingService;
-        }
-
-        public ShardedDataTreeActorCreator setShardingService(final DistributedShardedDOMDataTree shardingService) {
-            this.shardingService = shardingService;
-            return this;
-        }
-
-        public ActorSystem getActorSystem() {
-            return actorSystem;
-        }
-
-        public ShardedDataTreeActorCreator setActorSystem(final ActorSystem actorSystem) {
-            this.actorSystem = actorSystem;
-            return this;
-        }
-
-        public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper clusterWrapper) {
-            this.cluster = clusterWrapper;
-            return this;
-        }
-
-        public ClusterWrapper getClusterWrapper() {
-            return cluster;
-        }
-
-        public DistributedDataStoreInterface getDistributedConfigDatastore() {
-            return distributedConfigDatastore;
-        }
-
-        public ShardedDataTreeActorCreator setDistributedConfigDatastore(
-                final DistributedDataStoreInterface distributedConfigDatastore) {
-            this.distributedConfigDatastore = distributedConfigDatastore;
-            return this;
-        }
-
-        public DistributedDataStoreInterface getDistributedOperDatastore() {
-            return distributedOperDatastore;
-        }
-
-        public ShardedDataTreeActorCreator setDistributedOperDatastore(
-                final DistributedDataStoreInterface distributedOperDatastore) {
-            this.distributedOperDatastore = distributedOperDatastore;
-            return this;
-        }
-
-        public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int newMaxRetries) {
-            this.maxRetries = newMaxRetries;
-            return this;
-        }
-
-        public int getLookupTaskMaxRetries() {
-            return maxRetries;
-        }
-
-        private void verify() {
-            requireNonNull(shardingService);
-            requireNonNull(actorSystem);
-            requireNonNull(cluster);
-            requireNonNull(distributedConfigDatastore);
-            requireNonNull(distributedOperDatastore);
-        }
-
-        public Props props() {
-            verify();
-            return Props.create(ShardedDataTreeActor.class, this);
-        }
-    }
-}