import akka.actor.Props;
import akka.actor.Status;
import akka.actor.Status.Success;
-import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.ClusterEvent.MemberExited;
import akka.cluster.ClusterEvent.MemberRemoved;
private final int lookupTaskMaxRetries;
private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
- private final Map<DOMDataTreeIdentifier, ShardFrontendRegistration> idToShardRegistration = new HashMap<>();
-
- private final Cluster cluster;
-
- private Map<DOMDataTreeIdentifier, PrefixShardConfiguration> currentConfiguration = new HashMap<>();
ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
clusterWrapper.subscribeToMemberEvents(self());
- cluster = Cluster.get(actorSystem);
}
@Override
// schedule a notification task for the reply
actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
new ConfigShardLookupTask(
- actorSystem, getSender(), context, clusterWrapper, message, lookupTaskMaxRetries),
+ actorSystem, getSender(), context, message, lookupTaskMaxRetries),
actorSystem.dispatcher());
}
private final ActorSystem system;
private final ActorRef replyTo;
private final ActorContext context;
- private final ClusterWrapper clusterWrapper;
- private final int lookupTaskMaxRetries;
ConfigShardLookupTask(final ActorSystem system,
final ActorRef replyTo,
final ActorContext context,
- final ClusterWrapper clusterWrapper,
final StartConfigShardLookup message,
final int lookupMaxRetries) {
super(replyTo, lookupMaxRetries);
this.system = system;
this.replyTo = replyTo;
this.context = context;
- this.clusterWrapper = clusterWrapper;
- this.lookupTaskMaxRetries = lookupMaxRetries;
}
@Override
if (!localShard.isPresent()) {
tryReschedule(null);
} else {
- LOG.debug("Local backend for prefix configuration shard lookup successful, starting leader lookup..");
- system.scheduler().scheduleOnce(
- SHARD_LOOKUP_TASK_INTERVAL,
- new ConfigShardReadinessTask(
- system, replyTo, context, clusterWrapper, localShard.get(), lookupTaskMaxRetries),
- system.dispatcher());
+ LOG.debug("Local backend for prefix configuration shard lookup successful");
+ replyTo.tell(new Status.Success(null), noSender());
}
}
}
return this;
}
- public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper cluster) {
- this.cluster = cluster;
+ public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper clusterWrapper) {
+ this.cluster = clusterWrapper;
return this;
}
return this;
}
- public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int maxRetries) {
- this.maxRetries = maxRetries;
+ public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int newMaxRetries) {
+ this.maxRetries = newMaxRetries;
return this;
}