import akka.actor.Props;
import akka.actor.Status;
import akka.actor.Status.Success;
-import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.ClusterEvent.MemberExited;
import akka.cluster.ClusterEvent.MemberRemoved;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
// for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
private final ActorContext actorContext;
private final ShardingServiceAddressResolver resolver;
- private final DistributedDataStore distributedConfigDatastore;
- private final DistributedDataStore distributedOperDatastore;
+ private final AbstractDataStore distributedConfigDatastore;
+ private final AbstractDataStore distributedOperDatastore;
private final int lookupTaskMaxRetries;
private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
- private final Map<DOMDataTreeIdentifier, ShardFrontendRegistration> idToShardRegistration = new HashMap<>();
-
- private final Cluster cluster;
-
- private Map<DOMDataTreeIdentifier, PrefixShardConfiguration> currentConfiguration = new HashMap<>();
ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
clusterWrapper.subscribeToMemberEvents(self());
- cluster = Cluster.get(actorSystem);
}
@Override
final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[futures.size()]));
- combinedFuture.thenRun(() -> {
- sender.tell(new Status.Success(null), noSender());
- }).exceptionally(throwable -> {
- sender.tell(new Status.Failure(throwable), self());
- return null;
- });
+ combinedFuture
+ .thenRun(() -> sender.tell(new Success(null), noSender()))
+ .exceptionally(throwable -> {
+ sender.tell(new Status.Failure(throwable), self());
+ return null;
+ });
}
private void onNotifyProducerCreated(final NotifyProducerCreated message) {
// schedule a notification task for the reply
actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
new ConfigShardLookupTask(
- actorSystem, getSender(), context, clusterWrapper, message, lookupTaskMaxRetries),
+ actorSystem, getSender(), context, message, lookupTaskMaxRetries),
actorSystem.dispatcher());
}
private final ActorSystem system;
private final ActorRef replyTo;
private final ActorContext context;
- private final ClusterWrapper clusterWrapper;
- private final int lookupTaskMaxRetries;
ConfigShardLookupTask(final ActorSystem system,
final ActorRef replyTo,
final ActorContext context,
- final ClusterWrapper clusterWrapper,
final StartConfigShardLookup message,
final int lookupMaxRetries) {
super(replyTo, lookupMaxRetries);
this.system = system;
this.replyTo = replyTo;
this.context = context;
- this.clusterWrapper = clusterWrapper;
- this.lookupTaskMaxRetries = lookupMaxRetries;
}
@Override
if (!localShard.isPresent()) {
tryReschedule(null);
} else {
- LOG.debug("Local backend for prefix configuration shard lookup successful, starting leader lookup..");
- system.scheduler().scheduleOnce(
- SHARD_LOOKUP_TASK_INTERVAL,
- new ConfigShardReadinessTask(
- system, replyTo, context, clusterWrapper, localShard.get(), lookupTaskMaxRetries),
- system.dispatcher());
+ LOG.debug("Local backend for prefix configuration shard lookup successful");
+ replyTo.tell(new Status.Success(null), noSender());
}
}
}
public static class ShardedDataTreeActorCreator {
private DistributedShardedDOMDataTree shardingService;
- private DistributedDataStore distributedConfigDatastore;
- private DistributedDataStore distributedOperDatastore;
+ private AbstractDataStore distributedConfigDatastore;
+ private AbstractDataStore distributedOperDatastore;
private ActorSystem actorSystem;
private ClusterWrapper cluster;
private int maxRetries;
return this;
}
- public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper cluster) {
- this.cluster = cluster;
+ public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper clusterWrapper) {
+ this.cluster = clusterWrapper;
return this;
}
return cluster;
}
- public DistributedDataStore getDistributedConfigDatastore() {
+ public AbstractDataStore getDistributedConfigDatastore() {
return distributedConfigDatastore;
}
public ShardedDataTreeActorCreator setDistributedConfigDatastore(
- final DistributedDataStore distributedConfigDatastore) {
+ final AbstractDataStore distributedConfigDatastore) {
this.distributedConfigDatastore = distributedConfigDatastore;
return this;
}
- public DistributedDataStore getDistributedOperDatastore() {
+ public AbstractDataStore getDistributedOperDatastore() {
return distributedOperDatastore;
}
public ShardedDataTreeActorCreator setDistributedOperDatastore(
- final DistributedDataStore distributedOperDatastore) {
+ final AbstractDataStore distributedOperDatastore) {
this.distributedOperDatastore = distributedOperDatastore;
return this;
}
- public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int maxRetries) {
- this.maxRetries = maxRetries;
+ public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int newMaxRetries) {
+ this.maxRetries = newMaxRetries;
return this;
}