X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Feos-dom-akka%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Feos%2Fakka%2Fowner%2Fsupervisor%2FOwnerSupervisor.java;h=7a4f91ffecf4b68a40bbb39559841674ed5eea87;hp=047040a65dee43dd69d6edfa3019cae9c8c9cdfe;hb=HEAD;hpb=e2b581876b6ca70033affe321e07267dd7170705 diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java index 047040a65d..1e2a41beca 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java @@ -7,13 +7,16 @@ */ package org.opendaylight.controller.eos.akka.owner.supervisor; +import static com.google.common.base.Verify.verifyNotNull; +import static java.util.Objects.requireNonNull; + import akka.actor.typed.ActorRef; import akka.actor.typed.Behavior; -import akka.actor.typed.javadsl.AbstractBehavior; import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Receive; import akka.cluster.ClusterEvent; +import akka.cluster.ClusterEvent.CurrentClusterState; import akka.cluster.Member; import akka.cluster.ddata.LWWRegister; import akka.cluster.ddata.LWWRegisterKey; @@ -25,6 +28,7 @@ import akka.cluster.ddata.typed.javadsl.Replicator; import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter; import akka.cluster.typed.Cluster; import akka.cluster.typed.Subscribe; +import akka.pattern.StatusReply; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -33,16 +37,25 @@ import com.google.common.collect.Sets; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.BiPredicate; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.AbstractEntityRequest; import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidates; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesForMember; import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated; import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendReply; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendReply; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendReply; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest; import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent; import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent; import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent; @@ -50,6 +63,7 @@ import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUpEve import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerChanged; import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand; import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry; +import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec; import org.opendaylight.mdsal.eos.dom.api.DOMEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,7 +74,7 @@ import scala.collection.JavaConverters; * registry in distributed-data and picks entity owners based on the current cluster state and registered candidates. * On cluster up/down etc. events the owners are reassigned if possible. */ -public final class OwnerSupervisor extends AbstractBehavior { +public final class OwnerSupervisor extends AbstractSupervisor { private static final Logger LOG = LoggerFactory.getLogger(OwnerSupervisor.class); private static final String DATACENTER_PREFIX = "dc-"; @@ -70,7 +84,7 @@ public final class OwnerSupervisor extends AbstractBehavior clock = (currentTimestamp, value) -> currentTimestamp + 1; + private static final LWWRegister.Clock CLOCK = (currentTimestamp, value) -> currentTimestamp + 1; private final Cluster cluster; private final SelfUniqueAddress node; @@ -89,10 +103,14 @@ public final class OwnerSupervisor extends AbstractBehavior reassignPredicate = (entity, candidate) -> !isActiveCandidate(candidate) || !isCandidateFor(entity, candidate); + private final BindingInstanceIdentifierCodec iidCodec; + private OwnerSupervisor(final ActorContext context, final Map> currentCandidates, - final Map currentOwners) { + final Map currentOwners, + final BindingInstanceIdentifierCodec iidCodec) { super(context); + this.iidCodec = requireNonNull(iidCodec); final DistributedData distributedData = DistributedData.get(context.getSystem()); final ActorRef replicator = distributedData.replicator(); @@ -135,15 +153,14 @@ public final class OwnerSupervisor extends AbstractBehavior>>(context, replicator, - Duration.ofSeconds(5)).subscribe(CandidateRegistry.KEY, CandidatesChanged::new); + candidateReplicator.subscribe(CandidateRegistry.KEY, CandidatesChanged::new); LOG.debug("Owner Supervisor started"); } public static Behavior create(final Map> currentCandidates, - final Map currentOwners) { - return Behaviors.setup(ctx -> new OwnerSupervisor(ctx, currentCandidates, currentOwners)); + final Map currentOwners, final BindingInstanceIdentifierCodec iidCodec) { + return Behaviors.setup(ctx -> new OwnerSupervisor(ctx, currentCandidates, currentOwners, iidCodec)); } @Override @@ -156,13 +173,18 @@ public final class OwnerSupervisor extends AbstractBehavior onDeactivateDatacenter(final DeactivateDataCenter command) { LOG.debug("Deactivating Owner Supervisor on {}", cluster.selfMember()); command.getReplyTo().tell(DataCenterDeactivated.INSTANCE); - return IdleSupervisor.create(); + return IdleSupervisor.create(iidCodec); } private Behavior onOwnerChanged(final OwnerChanged command) { @@ -173,7 +195,7 @@ public final class OwnerSupervisor extends AbstractBehavior ownersToReassign = new HashSet<>(); for (final String owner : ownerToEntity.keys()) { - if (!activeMembers.contains(owner)) { + if (!isActiveCandidate(owner)) { ownersToReassign.add(owner); } } @@ -224,7 +246,7 @@ public final class OwnerSupervisor extends AbstractBehavior currentlyPresent = currentCandidates.getOrDefault(entity, Collections.emptySet()); + final Set currentlyPresent = currentCandidates.getOrDefault(entity, Set.of()); final Set difference = ImmutableSet.copyOf(Sets.symmetricDifference(currentlyPresent, candidates)); LOG.debug("currently present candidates: {}", currentlyPresent); @@ -239,8 +261,10 @@ public final class OwnerSupervisor extends AbstractBehavior predicate) { LOG.debug("Reassigning owners for {}", entities); for (final DOMEntity entity : entities) { - - if (predicate.test(entity, oldOwner)) { + + if (!isActiveCandidate(oldOwner) && isCandidateFor(entity, oldOwner) && hasSingleCandidate(entity)) { + // only skip new owner assignment, only if unreachable, still is a candidate and is the ONLY + // candidate + LOG.debug("{} is the only candidate for {}. Skipping reassignment.", oldOwner, entity); + continue; + } ownerToEntity.remove(oldOwner, entity); assignOwnerFor(entity); } @@ -289,7 +318,11 @@ public final class OwnerSupervisor extends AbstractBehavior(node.uniqueAddress(), candidate, 0), Replicator.writeLocal(), askReplyTo, - register -> register.withValue(node, candidate, clock)), + register -> register.withValue(node, candidate, CLOCK)), OwnerChanged::new); } @@ -356,6 +389,24 @@ public final class OwnerSupervisor extends AbstractBehavior onGetEntities(final GetEntitiesBackendRequest request) { + request.getReplyTo().tell(StatusReply.success(new GetEntitiesBackendReply(currentOwners, currentCandidates))); + return this; + } + + private Behavior onGetEntity(final GetEntityBackendRequest request) { + final DOMEntity entity = extractEntity(request); + request.getReplyTo().tell(StatusReply.success( + new GetEntityBackendReply(currentOwners.get(entity), currentCandidates.get(entity)))); + return this; + } + + private Behavior onGetEntityOwner(final GetEntityOwnerBackendRequest request) { + request.getReplyTo().tell( + StatusReply.success(new GetEntityOwnerBackendReply(currentOwners.get(extractEntity(request))))); + return this; + } + private void handleReachableEvent(final Set roles) { if (roles.contains(dataCenter)) { activeMembers.add(extractRole(roles)); @@ -389,21 +440,31 @@ public final class OwnerSupervisor extends AbstractBehavior getActiveMembers() { - final Set members = new HashSet<>(); - cluster.state().getMembers().forEach(member -> members.add(extractRole(member))); - // filter out unreachable - members.removeAll(cluster.state().getUnreachable().stream() - .map(OwnerSupervisor::extractRole) - .collect(Collectors.toSet())); - - // filter out members not from our datacenter - cluster.state().getMembers().forEach(member -> { - if (!member.roles().contains(dataCenter)) { - members.remove(extractRole(member)); - } - }); + final CurrentClusterState clusterState = cluster.state(); + final Set unreachableRoles = clusterState.getUnreachable().stream() + .map(OwnerSupervisor::extractRole) + .collect(Collectors.toSet()); + + return StreamSupport.stream(clusterState.getMembers().spliterator(), false) + // We are evaluating the set of roles for each member + .map(Member::getRoles) + // Filter out any members which do not share our dataCenter + .filter(roles -> roles.contains(dataCenter)) + // Find first legal role + .map(OwnerSupervisor::extractRole) + // filter out unreachable roles + .filter(role -> !unreachableRoles.contains(role)) + .collect(Collectors.toSet()); + } - return members; + private DOMEntity extractEntity(final AbstractEntityRequest request) { + final var name = request.getName(); + final var iid = name.getInstanceIdentifier(); + if (iid != null) { + return new DOMEntity(request.getType().getValue(), iidCodec.fromBinding(iid)); + } + final var str = verifyNotNull(name.getString(), "Unhandled entity name %s", name); + return new DOMEntity(request.getType().getValue(), str); } private static String extractRole(final Member member) { @@ -415,8 +476,13 @@ public final class OwnerSupervisor extends AbstractBehavior new IllegalArgumentException("No valid role found.")); } - private String extractDatacenterRole(final Member member) { + private static String extractDatacenterRole(final Member member) { return member.getRoles().stream().filter(role -> role.startsWith(DATACENTER_PREFIX)) .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found.")); } + + @Override + Logger getLogger() { + return LOG; + } }