import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
-import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
import akka.cluster.typed.Cluster;
import akka.cluster.typed.Subscribe;
+import akka.pattern.StatusReply;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.util.stream.StreamSupport;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.AbstractEntityRequest;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidates;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesForMember;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesReply;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesRequest;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerReply;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerRequest;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityReply;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
* registry in distributed-data and picks entity owners based on the current cluster state and registered candidates.
* On cluster up/down etc. events the owners are reassigned if possible.
*/
-public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
+public final class OwnerSupervisor extends AbstractSupervisor {
private static final Logger LOG = LoggerFactory.getLogger(OwnerSupervisor.class);
private static final String DATACENTER_PREFIX = "dc-";
// Our own clock implementation so we do not have to rely on synchronized clocks. This basically functions as an
// increasing counter which is fine for our needs as we only ever have a single writer since t supervisor is
// running in a cluster-singleton
- private final LWWRegister.Clock<String> clock = (currentTimestamp, value) -> currentTimestamp + 1;
+ private static final LWWRegister.Clock<String> CLOCK = (currentTimestamp, value) -> currentTimestamp + 1;
private final Cluster cluster;
private final SelfUniqueAddress node;
});
cluster.subscriptions().tell(Subscribe.create(reachabilityEventAdapter, ClusterEvent.ReachabilityEvent.class));
- new ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>>(context, replicator,
- Duration.ofSeconds(5)).subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
+ candidateReplicator.subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
LOG.debug("Owner Supervisor started");
}
.onMessage(MemberDownEvent.class, this::onPeerDown)
.onMessage(MemberReachableEvent.class, this::onPeerReachable)
.onMessage(MemberUnreachableEvent.class, this::onPeerUnreachable)
- .onMessage(GetEntitiesRequest.class, this::onGetEntities)
- .onMessage(GetEntityRequest.class, this::onGetEntity)
- .onMessage(GetEntityOwnerRequest.class, this::onGetEntityOwner)
+ .onMessage(GetEntitiesBackendRequest.class, this::onGetEntities)
+ .onMessage(GetEntityBackendRequest.class, this::onGetEntity)
+ .onMessage(GetEntityOwnerBackendRequest.class, this::onGetEntityOwner)
+ .onMessage(ClearCandidatesForMember.class, this::onClearCandidatesForMember)
+ .onMessage(ClearCandidates.class, this::finishClearCandidates)
.build();
}
private void reassignUnreachableOwners() {
final Set<String> ownersToReassign = new HashSet<>();
for (final String owner : ownerToEntity.keys()) {
- if (!activeMembers.contains(owner)) {
+ if (!isActiveCandidate(owner)) {
ownersToReassign.add(owner);
}
}
LOG.debug("Adding new candidate for entity: {} : {}", entity, toCheck);
currentCandidates.get(entity).add(toCheck);
- if (!currentOwners.containsKey(entity)) {
- // might as well assign right away when we don't have an owner
+ final String currentOwner = currentOwners.get(entity);
+
+ if (currentOwner == null || !activeMembers.contains(currentOwner)) {
+ // might as well assign right away when we don't have an owner or its unreachable
assignOwnerFor(entity);
}
LOG.debug("Reassigning owners for {}", entities);
for (final DOMEntity entity : entities) {
if (predicate.test(entity, oldOwner)) {
+
+ if (!isActiveCandidate(oldOwner) && isCandidateFor(entity, oldOwner) && hasSingleCandidate(entity)) {
+ // only skip new owner assignment, only if unreachable, still is a candidate and is the ONLY
+ // candidate
+ LOG.debug("{} is the only candidate for {}. Skipping reassignment.", oldOwner, entity);
+ continue;
+ }
ownerToEntity.remove(oldOwner, entity);
assignOwnerFor(entity);
}
return currentCandidates.getOrDefault(entity, Set.of()).contains(candidate);
}
+ private boolean hasSingleCandidate(final DOMEntity entity) {
+ return currentCandidates.getOrDefault(entity, Set.of()).size() == 1;
+ }
+
private void assignOwnerFor(final DOMEntity entity) {
final Set<String> candidatesForEntity = currentCandidates.get(entity);
if (candidatesForEntity.isEmpty()) {
new LWWRegister<>(node.uniqueAddress(), candidate, 0),
Replicator.writeLocal(),
askReplyTo,
- register -> register.withValue(node, candidate, clock)),
+ register -> register.withValue(node, candidate, CLOCK)),
OwnerChanged::new);
}
return this;
}
- private Behavior<OwnerSupervisorCommand> onGetEntities(final GetEntitiesRequest request) {
- request.getReplyTo().tell(new GetEntitiesReply(currentOwners, currentCandidates));
+ private Behavior<OwnerSupervisorCommand> onGetEntities(final GetEntitiesBackendRequest request) {
+ request.getReplyTo().tell(StatusReply.success(new GetEntitiesBackendReply(currentOwners, currentCandidates)));
return this;
}
- private Behavior<OwnerSupervisorCommand> onGetEntity(final GetEntityRequest request) {
+ private Behavior<OwnerSupervisorCommand> onGetEntity(final GetEntityBackendRequest request) {
final DOMEntity entity = extractEntity(request);
- request.getReplyTo().tell(new GetEntityReply(currentOwners.get(entity), currentCandidates.get(entity)));
+ request.getReplyTo().tell(StatusReply.success(
+ new GetEntityBackendReply(currentOwners.get(entity), currentCandidates.get(entity))));
return this;
}
- private Behavior<OwnerSupervisorCommand> onGetEntityOwner(final GetEntityOwnerRequest request) {
- request.getReplyTo().tell(new GetEntityOwnerReply(currentOwners.get(extractEntity(request))));
+ private Behavior<OwnerSupervisorCommand> onGetEntityOwner(final GetEntityOwnerBackendRequest request) {
+ request.getReplyTo().tell(
+ StatusReply.success(new GetEntityOwnerBackendReply(currentOwners.get(extractEntity(request)))));
return this;
}
return member.getRoles().stream().filter(role -> role.startsWith(DATACENTER_PREFIX))
.findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
}
+
+ @Override
+ Logger getLogger() {
+ return LOG;
+ }
}