X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Feos-dom-akka%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Feos%2Fakka%2Fowner%2Fsupervisor%2FOwnerSupervisor.java;h=e56ed59f3cfcd7f8cc521c0b3bb50eb557e44330;hb=refs%2Fchanges%2F21%2F97321%2F3;hp=c012afe6a1dc90f8860fca275d0f558f78c347fc;hpb=95a52fc4c0a1c9a9abe787b3f0e7ffa340709a96;p=controller.git diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java index c012afe6a1..e56ed59f3c 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java @@ -14,6 +14,7 @@ import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Receive; import akka.cluster.ClusterEvent; +import akka.cluster.ClusterEvent.CurrentClusterState; import akka.cluster.Member; import akka.cluster.ddata.LWWRegister; import akka.cluster.ddata.LWWRegisterKey; @@ -33,14 +34,23 @@ import com.google.common.collect.Sets; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.BiPredicate; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.AbstractEntityRequest; import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated; import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesReply; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesRequest; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerReply; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerRequest; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityReply; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityRequest; import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent; import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent; import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent; @@ -83,6 +93,10 @@ public final class OwnerSupervisor extends AbstractBehavior ownerToEntity = HashMultimap.create(); + // only reassign owner for those entities that lost this candidate or is not reachable + private final BiPredicate reassignPredicate = (entity, candidate) -> + !isActiveCandidate(candidate) || !isCandidateFor(entity, candidate); + private OwnerSupervisor(final ActorContext context, final Map> currentCandidates, final Map currentOwners) { @@ -145,18 +159,28 @@ public final class OwnerSupervisor extends AbstractBehavior onDeactivateDatacenter(final DeactivateDataCenter command) { LOG.debug("Deactivating Owner Supervisor on {}", cluster.selfMember()); + command.getReplyTo().tell(DataCenterDeactivated.INSTANCE); return IdleSupervisor.create(); } + private Behavior onOwnerChanged(final OwnerChanged command) { + LOG.debug("Owner has changed for {}", command.getResponse().key()); + return this; + } + private void reassignUnreachableOwners() { final Set ownersToReassign = new HashSet<>(); for (final String owner : ownerToEntity.keys()) { @@ -166,7 +190,7 @@ public final class OwnerSupervisor extends AbstractBehavior currentlyPresent = currentCandidates.getOrDefault(entity, Collections.emptySet()); + final Set currentlyPresent = currentCandidates.getOrDefault(entity, Set.of()); final Set difference = ImmutableSet.copyOf(Sets.symmetricDifference(currentlyPresent, candidates)); LOG.debug("currently present candidates: {}", currentlyPresent); @@ -247,7 +271,8 @@ public final class OwnerSupervisor extends AbstractBehavior entities) { + private void reassignCandidatesFor(final String oldOwner, final Collection entities, + final BiPredicate predicate) { LOG.debug("Reassigning owners for {}", entities); for (final DOMEntity entity : entities) { - - // only reassign owner for those entities that lost this candidate or is not reachable - if (!activeMembers.contains(oldOwner) - || !currentCandidates.getOrDefault(entity, Collections.emptySet()).contains(oldOwner)) { + if (predicate.test(entity, oldOwner)) { ownerToEntity.remove(oldOwner, entity); assignOwnerFor(entity); } } } + private boolean isActiveCandidate(final String candidate) { + return activeMembers.contains(candidate); + } + + private boolean isCandidateFor(final DOMEntity entity, final String candidate) { + return currentCandidates.getOrDefault(entity, Set.of()).contains(candidate); + } + private void assignOwnerFor(final DOMEntity entity) { final Set candidatesForEntity = currentCandidates.get(entity); if (candidatesForEntity.isEmpty()) { @@ -334,6 +365,22 @@ public final class OwnerSupervisor extends AbstractBehavior onGetEntities(final GetEntitiesRequest request) { + request.getReplyTo().tell(new GetEntitiesReply(currentOwners, currentCandidates)); + return this; + } + + private Behavior onGetEntity(final GetEntityRequest request) { + final DOMEntity entity = extractEntity(request); + request.getReplyTo().tell(new GetEntityReply(currentOwners.get(entity), currentCandidates.get(entity))); + return this; + } + + private Behavior onGetEntityOwner(final GetEntityOwnerRequest request) { + request.getReplyTo().tell(new GetEntityOwnerReply(currentOwners.get(extractEntity(request)))); + return this; + } + private void handleReachableEvent(final Set roles) { if (roles.contains(dataCenter)) { activeMembers.add(extractRole(roles)); @@ -367,21 +414,25 @@ public final class OwnerSupervisor extends AbstractBehavior getActiveMembers() { - final Set members = new HashSet<>(); - cluster.state().getMembers().forEach(member -> members.add(extractRole(member))); - // filter out unreachable - members.removeAll(cluster.state().getUnreachable().stream() - .map(OwnerSupervisor::extractRole) - .collect(Collectors.toSet())); - - // filter out members not from our datacenter - cluster.state().getMembers().forEach(member -> { - if (!member.roles().contains(dataCenter)) { - members.remove(extractRole(member)); - } - }); + final CurrentClusterState clusterState = cluster.state(); + final Set unreachableRoles = clusterState.getUnreachable().stream() + .map(OwnerSupervisor::extractRole) + .collect(Collectors.toSet()); + + return StreamSupport.stream(clusterState.getMembers().spliterator(), false) + // We are evaluating the set of roles for each member + .map(Member::getRoles) + // Filter out any members which do not share our dataCenter + .filter(roles -> roles.contains(dataCenter)) + // Find first legal role + .map(OwnerSupervisor::extractRole) + // filter out unreachable roles + .filter(role -> !unreachableRoles.contains(role)) + .collect(Collectors.toSet()); + } - return members; + private static DOMEntity extractEntity(final AbstractEntityRequest request) { + return new DOMEntity(request.getType().getValue(), request.getName().getValue()); } private static String extractRole(final Member member) { @@ -393,7 +444,7 @@ public final class OwnerSupervisor extends AbstractBehavior new IllegalArgumentException("No valid role found.")); } - private String extractDatacenterRole(final Member member) { + private static String extractDatacenterRole(final Member member) { return member.getRoles().stream().filter(role -> role.startsWith(DATACENTER_PREFIX)) .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found.")); }