package org.opendaylight.controller.cluster.sharding;
-import static akka.actor.ActorRef.noSender;
-
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.Status.Success;
-import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.ClusterEvent.MemberExited;
import akka.cluster.ClusterEvent.MemberRemoved;
private final int lookupTaskMaxRetries;
private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
- private final Map<DOMDataTreeIdentifier, ShardFrontendRegistration> idToShardRegistration = new HashMap<>();
-
- private final Cluster cluster;
-
- private Map<DOMDataTreeIdentifier, PrefixShardConfiguration> currentConfiguration = new HashMap<>();
ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
clusterWrapper.subscribeToMemberEvents(self());
- cluster = Cluster.get(actorSystem);
}
@Override
}
@Override
- protected void handleRecover(final Object message) throws Exception {
+ protected void handleRecover(final Object message) {
LOG.debug("Received a recover message {}", message);
}
@Override
- protected void handleCommand(final Object message) throws Exception {
+ protected void handleCommand(final Object message) {
LOG.debug("{} : Received {}", clusterWrapper.getCurrentMemberName(), message);
if (message instanceof ClusterEvent.MemberUp) {
memberUp((ClusterEvent.MemberUp) message);
// fastpath if we have no peers
if (resolver.getShardingServicePeerActorAddresses().isEmpty()) {
- getSender().tell(new Status.Success(null), noSender());
+ getSender().tell(new Status.Success(null), ActorRef.noSender());
}
final ActorRef sender = getSender();
futures.toArray(new CompletableFuture[futures.size()]));
combinedFuture
- .thenRun(() -> sender.tell(new Success(null), noSender()))
+ .thenRun(() -> sender.tell(new Success(null), ActorRef.noSender()))
.exceptionally(throwable -> {
sender.tell(new Status.Failure(throwable), self());
return null;
final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next());
if (registration == null) {
LOG.warn("The notification contained a path on which no producer is registered, throwing away");
- getSender().tell(new Status.Success(null), noSender());
+ getSender().tell(new Status.Success(null), ActorRef.noSender());
return;
}
try {
registration.close();
- getSender().tell(new Status.Success(null), noSender());
+ getSender().tell(new Status.Success(null), ActorRef.noSender());
} catch (final DOMDataTreeProducerException e) {
LOG.error("Unable to close producer", e);
- getSender().tell(new Status.Failure(e), noSender());
+ getSender().tell(new Status.Failure(e), ActorRef.noSender());
}
}
// schedule a notification task for the reply
actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
new ConfigShardLookupTask(
- actorSystem, getSender(), context, clusterWrapper, message, lookupTaskMaxRetries),
+ actorSystem, getSender(), context, message, lookupTaskMaxRetries),
actorSystem.dispatcher());
}
localShardFuture.onComplete(new OnComplete<ActorRef>() {
@Override
- public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
+ public void onComplete(Throwable throwable, ActorRef actorRef) {
if (throwable != null) {
tryReschedule(throwable);
} else {
ask.onComplete(new OnComplete<Object>() {
@Override
- public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable {
+ public void onComplete(final Throwable throwable, final Object findLeaderReply) {
if (throwable != null) {
tryReschedule(throwable);
} else {
shardingService.lookupShardFrontend(toLookup);
if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) {
- replyTo.tell(new Success(null), noSender());
+ replyTo.tell(new Success(null), ActorRef.noSender());
} else {
tryReschedule(null);
}
localShardFuture.onComplete(new OnComplete<ActorRef>() {
@Override
- public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
+ public void onComplete(Throwable throwable, ActorRef actorRef) {
if (throwable != null) {
//TODO Shouldn't we check why findLocalShard failed?
LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future",
toLookup);
- replyTo.tell(new Success(null), noSender());
+ replyTo.tell(new Success(null), ActorRef.noSender());
} else {
tryReschedule(null);
}
private final ActorSystem system;
private final ActorRef replyTo;
private final ActorContext context;
- private final ClusterWrapper clusterWrapper;
- private final int lookupTaskMaxRetries;
ConfigShardLookupTask(final ActorSystem system,
final ActorRef replyTo,
final ActorContext context,
- final ClusterWrapper clusterWrapper,
final StartConfigShardLookup message,
final int lookupMaxRetries) {
super(replyTo, lookupMaxRetries);
this.system = system;
this.replyTo = replyTo;
this.context = context;
- this.clusterWrapper = clusterWrapper;
- this.lookupTaskMaxRetries = lookupMaxRetries;
}
@Override
if (!localShard.isPresent()) {
tryReschedule(null);
} else {
- LOG.debug("Local backend for prefix configuration shard lookup successful, starting leader lookup..");
- system.scheduler().scheduleOnce(
- SHARD_LOOKUP_TASK_INTERVAL,
- new ConfigShardReadinessTask(
- system, replyTo, context, clusterWrapper, localShard.get(), lookupTaskMaxRetries),
- system.dispatcher());
+ LOG.debug("Local backend for prefix configuration shard lookup successful");
+ replyTo.tell(new Status.Success(null), ActorRef.noSender());
}
}
}
ask.onComplete(new OnComplete<Object>() {
@Override
- public void onComplete(final Throwable throwable, final Object findLeaderReply) throws Throwable {
+ public void onComplete(final Throwable throwable, final Object findLeaderReply) {
if (throwable != null) {
tryReschedule(throwable);
} else {
// leader is found, backend seems ready, check if the frontend is ready
LOG.debug("{} - Leader for config shard is ready. Ending lookup.",
clusterWrapper.getCurrentMemberName());
- replyTo.tell(new Status.Success(null), noSender());
+ replyTo.tell(new Status.Success(null), ActorRef.noSender());
} else {
tryReschedule(null);
}
return this;
}
- public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper cluster) {
- this.cluster = cluster;
+ public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper clusterWrapper) {
+ this.cluster = clusterWrapper;
return this;
}
return this;
}
- public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int maxRetries) {
- this.maxRetries = maxRetries;
+ public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int newMaxRetries) {
+ this.maxRetries = newMaxRetries;
return this;
}