X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fsharding%2FShardedDataTreeActor.java;h=3efbbabeb8fb1a6335995bf236d86098f29093be;hp=04a75628b7d8d6583b661b27ce71d9f6a7e3ccac;hb=8232a626b43fdd2f5799da0fbcfb0f02d3c8f4fb;hpb=20f8f30f4bbf1e982672c1f883a6a18b0e4539de diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java index 04a75628b7..3efbbabeb8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java @@ -16,9 +16,7 @@ import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Status; -import akka.actor.Status.Failure; import akka.actor.Status.Success; -import akka.cluster.Cluster; import akka.cluster.ClusterEvent; import akka.cluster.ClusterEvent.MemberExited; import akka.cluster.ClusterEvent.MemberRemoved; @@ -27,46 +25,38 @@ import akka.cluster.ClusterEvent.MemberWeaklyUp; import akka.cluster.ClusterEvent.ReachableMember; import akka.cluster.ClusterEvent.UnreachableMember; import akka.cluster.Member; -import akka.cluster.ddata.DistributedData; -import akka.cluster.ddata.ORMap; -import akka.cluster.ddata.Replicator; -import akka.cluster.ddata.Replicator.Changed; -import akka.cluster.ddata.Replicator.Subscribe; -import akka.cluster.ddata.Replicator.Update; import akka.dispatch.OnComplete; import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.collect.Sets; -import com.google.common.collect.Sets.SetView; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.stream.Collectors; -import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; +import org.opendaylight.controller.cluster.datastore.AbstractDataStore; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; -import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; -import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard; +import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard; import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated; import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved; import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated; +import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup; import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved; import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated; import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved; -import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard; +import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException; @@ -94,7 +84,6 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS); static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS); - static final int LOOKUP_TASK_MAX_RETRIES = 100; private final DistributedShardedDOMDataTree shardingService; private final ActorSystem actorSystem; @@ -103,17 +92,11 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore private final ActorContext actorContext; private final ShardingServiceAddressResolver resolver; - private final DistributedDataStore distributedConfigDatastore; - private final DistributedDataStore distributedOperDatastore; + private final AbstractDataStore distributedConfigDatastore; + private final AbstractDataStore distributedOperDatastore; + private final int lookupTaskMaxRetries; private final Map idToProducer = new HashMap<>(); - private final Map idToShardRegistration = new HashMap<>(); - - private final Cluster cluster; - private final ActorRef replicator; - - private ORMap currentData = ORMap.create(); - private Map currentConfiguration = new HashMap<>(); ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) { LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName()); @@ -123,21 +106,16 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { clusterWrapper = builder.getClusterWrapper(); distributedConfigDatastore = builder.getDistributedConfigDatastore(); distributedOperDatastore = builder.getDistributedOperDatastore(); + lookupTaskMaxRetries = builder.getLookupTaskMaxRetries(); actorContext = distributedConfigDatastore.getActorContext(); resolver = new ShardingServiceAddressResolver( DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName()); clusterWrapper.subscribeToMemberEvents(self()); - cluster = Cluster.get(actorSystem); - - replicator = DistributedData.get(context().system()).replicator(); } @Override public void preStart() { - final Subscribe> subscribe = - new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self()); - replicator.tell(subscribe, noSender()); } @Override @@ -147,7 +125,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { @Override protected void handleCommand(final Object message) throws Exception { - LOG.debug("Received {}", message); + LOG.debug("{} : Received {}", clusterWrapper.getCurrentMemberName(), message); if (message instanceof ClusterEvent.MemberUp) { memberUp((ClusterEvent.MemberUp) message); } else if (message instanceof ClusterEvent.MemberWeaklyUp) { @@ -160,8 +138,6 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { memberUnreachable((ClusterEvent.UnreachableMember) message); } else if (message instanceof ClusterEvent.ReachableMember) { memberReachable((ClusterEvent.ReachableMember) message); - } else if (message instanceof Changed) { - onConfigChanged((Changed) message); } else if (message instanceof ProducerCreated) { onProducerCreated((ProducerCreated) message); } else if (message instanceof NotifyProducerCreated) { @@ -172,51 +148,17 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { onNotifyProducerRemoved((NotifyProducerRemoved) message); } else if (message instanceof PrefixShardCreated) { onPrefixShardCreated((PrefixShardCreated) message); - } else if (message instanceof CreatePrefixShard) { - onCreatePrefixShard((CreatePrefixShard) message); - } else if (message instanceof RemovePrefixShard) { - onRemovePrefixShard((RemovePrefixShard) message); + } else if (message instanceof LookupPrefixShard) { + onLookupPrefixShard((LookupPrefixShard) message); + } else if (message instanceof PrefixShardRemovalLookup) { + onPrefixShardRemovalLookup((PrefixShardRemovalLookup) message); } else if (message instanceof PrefixShardRemoved) { onPrefixShardRemoved((PrefixShardRemoved) message); + } else if (message instanceof StartConfigShardLookup) { + onStartConfigShardLookup((StartConfigShardLookup) message); } } - private void onConfigChanged(final Changed> change) { - LOG.debug("member : {}, Received configuration changed: {}", clusterWrapper.getCurrentMemberName(), change); - - currentData = change.dataValue(); - final Map changedConfig = change.dataValue().getEntries(); - - LOG.debug("Changed set {}", changedConfig); - - try { - final Map newConfig = - changedConfig.values().stream().collect( - Collectors.toMap(PrefixShardConfiguration::getPrefix, Function.identity())); - resolveConfig(newConfig); - } catch (final IllegalStateException e) { - LOG.error("Failed, ", e); - } - - } - - private void resolveConfig(final Map newConfig) { - - // get the removed configurations - final SetView deleted = - Sets.difference(currentConfiguration.keySet(), newConfig.keySet()); - shardingService.resolveShardRemovals(deleted); - - // get the added configurations - final SetView additions = - Sets.difference(newConfig.keySet(), currentConfiguration.keySet()); - shardingService.resolveShardAdditions(additions); - // we can ignore those that existed previously since the potential changes in replicas will be handled by - // shard manager. - - currentConfiguration = new HashMap<>(newConfig); - } - @Override public String persistenceId() { return PERSISTENCE_ID; @@ -297,12 +239,12 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { final CompletableFuture combinedFuture = CompletableFuture.allOf( futures.toArray(new CompletableFuture[futures.size()])); - combinedFuture.thenRun(() -> { - sender.tell(new Status.Success(null), noSender()); - }).exceptionally(throwable -> { - sender.tell(new Status.Failure(throwable), self()); - return null; - }); + combinedFuture + .thenRun(() -> sender.tell(new Success(null), noSender())) + .exceptionally(throwable -> { + sender.tell(new Status.Failure(throwable), self()); + return null; + }); } private void onNotifyProducerCreated(final NotifyProducerCreated message) { @@ -367,61 +309,34 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } @SuppressWarnings("checkstyle:IllegalCatch") - private void onCreatePrefixShard(final CreatePrefixShard message) { - LOG.debug("Member: {}, Received CreatePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message); + private void onLookupPrefixShard(final LookupPrefixShard message) { + LOG.debug("Member: {}, Received LookupPrefixShard: {}", clusterWrapper.getCurrentMemberName(), message); - final PrefixShardConfiguration configuration = message.getConfiguration(); + final DOMDataTreeIdentifier prefix = message.getPrefix(); - final Update> update = - new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(), - map -> map.put(cluster, configuration.toDataMapKey(), configuration)); - - replicator.tell(update, self()); + final ActorContext context = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION + ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext(); // schedule a notification task for the reply actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper, - actorContext, shardingService, configuration.getPrefix()), - actorSystem.dispatcher()); + context, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher()); } private void onPrefixShardCreated(final PrefixShardCreated message) { LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message); - final Collection addresses = resolver.getShardingServicePeerActorAddresses(); - final ActorRef sender = getSender(); - - final List> futures = new ArrayList<>(); - - for (final String address : addresses) { - final ActorSelection actorSelection = actorSystem.actorSelection(address); - futures.add(FutureConverters.toJava(actorContext.executeOperationAsync(actorSelection, - new CreatePrefixShard(message.getConfiguration()))).toCompletableFuture()); - } + final PrefixShardConfiguration config = message.getConfiguration(); - final CompletableFuture combinedFuture = - CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); - - combinedFuture.thenRun(() -> { - sender.tell(new Status.Success(null), self()); - }).exceptionally(throwable -> { - sender.tell(new Status.Failure(throwable), self()); - return null; - }); + shardingService.resolveShardAdditions(Collections.singleton(config.getPrefix())); } - private void onRemovePrefixShard(final RemovePrefixShard message) { - LOG.debug("Member: {}, Received RemovePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message); - - //TODO the removal message should have the configuration or some other way to get to the key - final Update> removal = - new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(), - map -> map.remove(cluster, "prefix=" + message.getPrefix())); - replicator.tell(removal, self()); + private void onPrefixShardRemovalLookup(final PrefixShardRemovalLookup message) { + LOG.debug("Member: {}, Received PrefixShardRemovalLookup: {}", clusterWrapper.getCurrentMemberName(), message); final ShardRemovalLookupTask removalTask = new ShardRemovalLookupTask(actorSystem, getSender(), - actorContext, message.getPrefix()); + actorContext, message.getPrefix(), lookupTaskMaxRetries); actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher()); } @@ -429,15 +344,21 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private void onPrefixShardRemoved(final PrefixShardRemoved message) { LOG.debug("Received PrefixShardRemoved: {}", message); - final ShardFrontendRegistration registration = idToShardRegistration.get(message.getPrefix()); + shardingService.resolveShardRemovals(Collections.singleton(message.getPrefix())); + } - if (registration == null) { - LOG.warn("Received shard removed for {}, but not shard registered at this prefix all registrations: {}", - message.getPrefix(), idToShardRegistration); - return; - } + private void onStartConfigShardLookup(final StartConfigShardLookup message) { + LOG.debug("Received StartConfigShardLookup: {}", message); - registration.close(); + final ActorContext context = + message.getType().equals(LogicalDatastoreType.CONFIGURATION) + ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext(); + + // schedule a notification task for the reply + actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, + new ConfigShardLookupTask( + actorSystem, getSender(), context, message, lookupTaskMaxRetries), + actorSystem.dispatcher()); } private static MemberName memberToName(final Member member) { @@ -481,39 +402,6 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } } - private abstract static class LookupTask implements Runnable { - - private final ActorRef replyTo; - private int retries = 0; - - private LookupTask(final ActorRef replyTo) { - this.replyTo = replyTo; - } - - abstract void reschedule(int retries); - - void tryReschedule(@Nullable final Throwable throwable) { - if (retries <= LOOKUP_TASK_MAX_RETRIES) { - retries++; - reschedule(retries); - } else { - fail(throwable); - } - } - - void fail(@Nullable final Throwable throwable) { - if (throwable == null) { - replyTo.tell(new Failure( - new DOMDataTreeShardCreationFailedException("Unable to find the backend shard." - + "Failing..")), noSender()); - } else { - replyTo.tell(new Failure( - new DOMDataTreeShardCreationFailedException("Unable to find the backend shard." - + "Failing..", throwable)), noSender()); - } - } - } - /** * Handles the lookup step of cds shard creation once the configuration is updated. */ @@ -525,20 +413,23 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private final ActorContext context; private final DistributedShardedDOMDataTree shardingService; private final DOMDataTreeIdentifier toLookup; + private final int lookupMaxRetries; ShardCreationLookupTask(final ActorSystem system, final ActorRef replyTo, final ClusterWrapper clusterWrapper, final ActorContext context, final DistributedShardedDOMDataTree shardingService, - final DOMDataTreeIdentifier toLookup) { - super(replyTo); + final DOMDataTreeIdentifier toLookup, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); this.system = system; this.replyTo = replyTo; this.clusterWrapper = clusterWrapper; this.context = context; this.shardingService = shardingService; this.toLookup = toLookup; + this.lookupMaxRetries = lookupMaxRetries; } @Override @@ -557,7 +448,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { system.scheduler().scheduleOnce( SHARD_LOOKUP_TASK_INTERVAL, new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef, - shardingService, toLookup), + shardingService, toLookup, lookupMaxRetries), system.dispatcher()); } } @@ -584,6 +475,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private final ActorRef shard; private final DistributedShardedDOMDataTree shardingService; private final DOMDataTreeIdentifier toLookup; + private final int lookupMaxRetries; ShardLeaderLookupTask(final ActorSystem system, final ActorRef replyTo, @@ -591,8 +483,9 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { final ClusterWrapper clusterWrapper, final ActorRef shard, final DistributedShardedDOMDataTree shardingService, - final DOMDataTreeIdentifier toLookup) { - super(replyTo); + final DOMDataTreeIdentifier toLookup, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); this.system = system; this.replyTo = replyTo; this.context = context; @@ -600,6 +493,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { this.shard = shard; this.shardingService = shardingService; this.toLookup = toLookup; + this.lookupMaxRetries = lookupMaxRetries; } @Override @@ -621,7 +515,8 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { clusterWrapper.getCurrentMemberName(), toLookup); system.scheduler().scheduleOnce( SHARD_LOOKUP_TASK_INTERVAL, - new FrontendLookupTask(system, replyTo, shardingService, toLookup), + new FrontendLookupTask( + system, replyTo, shardingService, toLookup, lookupMaxRetries), system.dispatcher()); } else { tryReschedule(null); @@ -656,8 +551,9 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { FrontendLookupTask(final ActorSystem system, final ActorRef replyTo, final DistributedShardedDOMDataTree shardingService, - final DOMDataTreeIdentifier toLookup) { - super(replyTo); + final DOMDataTreeIdentifier toLookup, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); this.system = system; this.replyTo = replyTo; this.shardingService = shardingService; @@ -715,8 +611,9 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { ShardRemovalLookupTask(final ActorSystem system, final ActorRef replyTo, final ActorContext context, - final DOMDataTreeIdentifier toLookup) { - super(replyTo); + final DOMDataTreeIdentifier toLookup, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); this.system = system; this.replyTo = replyTo; this.context = context; @@ -752,13 +649,114 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { } } + /** + * Task for handling the lookup of the backend for the configuration shard. + */ + private static class ConfigShardLookupTask extends LookupTask { + + private final ActorSystem system; + private final ActorRef replyTo; + private final ActorContext context; + + ConfigShardLookupTask(final ActorSystem system, + final ActorRef replyTo, + final ActorContext context, + final StartConfigShardLookup message, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); + this.system = system; + this.replyTo = replyTo; + this.context = context; + } + + @Override + void reschedule(int retries) { + LOG.debug("Local backend for prefix configuration shard not found, try: {}, rescheduling..", retries); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, ConfigShardLookupTask.this, system.dispatcher()); + } + + @Override + public void run() { + final Optional localShard = + context.findLocalShard(ClusterUtils.PREFIX_CONFIG_SHARD_ID); + + if (!localShard.isPresent()) { + tryReschedule(null); + } else { + LOG.debug("Local backend for prefix configuration shard lookup successful"); + replyTo.tell(new Status.Success(null), noSender()); + } + } + } + + /** + * Task for handling the readiness state of the config shard. Reports success once the leader is elected. + */ + private static class ConfigShardReadinessTask extends LookupTask { + + private final ActorSystem system; + private final ActorRef replyTo; + private final ActorContext context; + private final ClusterWrapper clusterWrapper; + private final ActorRef shard; + + ConfigShardReadinessTask(final ActorSystem system, + final ActorRef replyTo, + final ActorContext context, + final ClusterWrapper clusterWrapper, + final ActorRef shard, + final int lookupMaxRetries) { + super(replyTo, lookupMaxRetries); + this.system = system; + this.replyTo = replyTo; + this.context = context; + this.clusterWrapper = clusterWrapper; + this.shard = shard; + } + + @Override + void reschedule(int retries) { + LOG.debug("{} - Leader for config shard not found on try: {}, retrying..", + clusterWrapper.getCurrentMemberName(), retries); + system.scheduler().scheduleOnce( + SHARD_LOOKUP_TASK_INTERVAL, ConfigShardReadinessTask.this, system.dispatcher()); + } + + @Override + public void run() { + final Future ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout()); + + ask.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable { + if (throwable != null) { + tryReschedule(throwable); + } else { + final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply; + final java.util.Optional leaderActor = findLeader.getLeaderActor(); + if (leaderActor.isPresent()) { + // leader is found, backend seems ready, check if the frontend is ready + LOG.debug("{} - Leader for config shard is ready. Ending lookup.", + clusterWrapper.getCurrentMemberName()); + replyTo.tell(new Status.Success(null), noSender()); + } else { + tryReschedule(null); + } + } + } + }, system.dispatcher()); + } + } + public static class ShardedDataTreeActorCreator { private DistributedShardedDOMDataTree shardingService; - private DistributedDataStore distributedConfigDatastore; - private DistributedDataStore distributedOperDatastore; + private AbstractDataStore distributedConfigDatastore; + private AbstractDataStore distributedOperDatastore; private ActorSystem actorSystem; private ClusterWrapper cluster; + private int maxRetries; public DistributedShardedDOMDataTree getShardingService() { return shardingService; @@ -787,26 +785,35 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { return cluster; } - public DistributedDataStore getDistributedConfigDatastore() { + public AbstractDataStore getDistributedConfigDatastore() { return distributedConfigDatastore; } public ShardedDataTreeActorCreator setDistributedConfigDatastore( - final DistributedDataStore distributedConfigDatastore) { + final AbstractDataStore distributedConfigDatastore) { this.distributedConfigDatastore = distributedConfigDatastore; return this; } - public DistributedDataStore getDistributedOperDatastore() { + public AbstractDataStore getDistributedOperDatastore() { return distributedOperDatastore; } public ShardedDataTreeActorCreator setDistributedOperDatastore( - final DistributedDataStore distributedOperDatastore) { + final AbstractDataStore distributedOperDatastore) { this.distributedOperDatastore = distributedOperDatastore; return this; } + public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int maxRetries) { + this.maxRetries = maxRetries; + return this; + } + + public int getLookupTaskMaxRetries() { + return maxRetries; + } + private void verify() { Preconditions.checkNotNull(shardingService); Preconditions.checkNotNull(actorSystem);