*/
package org.opendaylight.controller.eos.akka.owner.supervisor;
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import akka.cluster.ClusterEvent;
+import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.Member;
import akka.cluster.ddata.LWWRegister;
import akka.cluster.ddata.LWWRegisterKey;
import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
import akka.cluster.typed.Cluster;
import akka.cluster.typed.Subscribe;
+import akka.pattern.StatusReply;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.AbstractEntityRequest;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerChanged;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
+import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final BiPredicate<DOMEntity, String> reassignPredicate = (entity, candidate) ->
!isActiveCandidate(candidate) || !isCandidateFor(entity, candidate);
+ private final BindingInstanceIdentifierCodec iidCodec;
+
private OwnerSupervisor(final ActorContext<OwnerSupervisorCommand> context,
final Map<DOMEntity, Set<String>> currentCandidates,
- final Map<DOMEntity, String> currentOwners) {
+ final Map<DOMEntity, String> currentOwners,
+ final BindingInstanceIdentifierCodec iidCodec) {
super(context);
+ this.iidCodec = requireNonNull(iidCodec);
final DistributedData distributedData = DistributedData.get(context.getSystem());
final ActorRef<Replicator.Command> replicator = distributedData.replicator();
}
public static Behavior<OwnerSupervisorCommand> create(final Map<DOMEntity, Set<String>> currentCandidates,
- final Map<DOMEntity, String> currentOwners) {
- return Behaviors.setup(ctx -> new OwnerSupervisor(ctx, currentCandidates, currentOwners));
+ final Map<DOMEntity, String> currentOwners, final BindingInstanceIdentifierCodec iidCodec) {
+ return Behaviors.setup(ctx -> new OwnerSupervisor(ctx, currentCandidates, currentOwners, iidCodec));
}
@Override
return newReceiveBuilder()
.onMessage(CandidatesChanged.class, this::onCandidatesChanged)
.onMessage(DeactivateDataCenter.class, this::onDeactivateDatacenter)
+ .onMessage(OwnerChanged.class, this::onOwnerChanged)
.onMessage(MemberUpEvent.class, this::onPeerUp)
.onMessage(MemberDownEvent.class, this::onPeerDown)
.onMessage(MemberReachableEvent.class, this::onPeerReachable)
.onMessage(MemberUnreachableEvent.class, this::onPeerUnreachable)
+ .onMessage(GetEntitiesBackendRequest.class, this::onGetEntities)
+ .onMessage(GetEntityBackendRequest.class, this::onGetEntity)
+ .onMessage(GetEntityOwnerBackendRequest.class, this::onGetEntityOwner)
.build();
}
private Behavior<OwnerSupervisorCommand> onDeactivateDatacenter(final DeactivateDataCenter command) {
LOG.debug("Deactivating Owner Supervisor on {}", cluster.selfMember());
command.getReplyTo().tell(DataCenterDeactivated.INSTANCE);
- return IdleSupervisor.create();
+ return IdleSupervisor.create(iidCodec);
+ }
+
+ private Behavior<OwnerSupervisorCommand> onOwnerChanged(final OwnerChanged command) {
+ LOG.debug("Owner has changed for {}", command.getResponse().key());
+ return this;
}
private void reassignUnreachableOwners() {
return;
}
- final Set<String> currentlyPresent = currentCandidates.getOrDefault(entity, Collections.emptySet());
+ final Set<String> currentlyPresent = currentCandidates.getOrDefault(entity, Set.of());
final Set<String> difference = ImmutableSet.copyOf(Sets.symmetricDifference(currentlyPresent, candidates));
LOG.debug("currently present candidates: {}", currentlyPresent);
final BiPredicate<DOMEntity, String> predicate) {
LOG.debug("Reassigning owners for {}", entities);
for (final DOMEntity entity : entities) {
-
-
if (predicate.test(entity, oldOwner)) {
ownerToEntity.remove(oldOwner, entity);
assignOwnerFor(entity);
}
private boolean isCandidateFor(final DOMEntity entity, final String candidate) {
- return currentCandidates.getOrDefault(entity, Collections.emptySet()).contains(candidate);
+ return currentCandidates.getOrDefault(entity, Set.of()).contains(candidate);
}
private void assignOwnerFor(final DOMEntity entity) {
return this;
}
+ private Behavior<OwnerSupervisorCommand> onGetEntities(final GetEntitiesBackendRequest request) {
+ request.getReplyTo().tell(StatusReply.success(new GetEntitiesBackendReply(currentOwners, currentCandidates)));
+ return this;
+ }
+
+ private Behavior<OwnerSupervisorCommand> onGetEntity(final GetEntityBackendRequest request) {
+ final DOMEntity entity = extractEntity(request);
+ request.getReplyTo().tell(StatusReply.success(
+ new GetEntityBackendReply(currentOwners.get(entity), currentCandidates.get(entity))));
+ return this;
+ }
+
+ private Behavior<OwnerSupervisorCommand> onGetEntityOwner(final GetEntityOwnerBackendRequest request) {
+ request.getReplyTo().tell(
+ StatusReply.success(new GetEntityOwnerBackendReply(currentOwners.get(extractEntity(request)))));
+ return this;
+ }
+
private void handleReachableEvent(final Set<String> roles) {
if (roles.contains(dataCenter)) {
activeMembers.add(extractRole(roles));
}
private Set<String> getActiveMembers() {
- final Set<String> members = new HashSet<>();
- cluster.state().getMembers().forEach(member -> members.add(extractRole(member)));
- // filter out unreachable
- members.removeAll(cluster.state().getUnreachable().stream()
- .map(OwnerSupervisor::extractRole)
- .collect(Collectors.toSet()));
-
- // filter out members not from our datacenter
- cluster.state().getMembers().forEach(member -> {
- if (!member.roles().contains(dataCenter)) {
- members.remove(extractRole(member));
- }
- });
+ final CurrentClusterState clusterState = cluster.state();
+ final Set<String> unreachableRoles = clusterState.getUnreachable().stream()
+ .map(OwnerSupervisor::extractRole)
+ .collect(Collectors.toSet());
+
+ return StreamSupport.stream(clusterState.getMembers().spliterator(), false)
+ // We are evaluating the set of roles for each member
+ .map(Member::getRoles)
+ // Filter out any members which do not share our dataCenter
+ .filter(roles -> roles.contains(dataCenter))
+ // Find first legal role
+ .map(OwnerSupervisor::extractRole)
+ // filter out unreachable roles
+ .filter(role -> !unreachableRoles.contains(role))
+ .collect(Collectors.toSet());
+ }
- return members;
+ private DOMEntity extractEntity(final AbstractEntityRequest<?> request) {
+ final var name = request.getName();
+ final var iid = name.getInstanceIdentifier();
+ if (iid != null) {
+ return new DOMEntity(request.getType().getValue(), iidCodec.fromBinding(iid));
+ }
+ final var str = verifyNotNull(name.getString(), "Unhandled entity name %s", name);
+ return new DOMEntity(request.getType().getValue(), str);
}
private static String extractRole(final Member member) {
.findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
}
- private String extractDatacenterRole(final Member member) {
+ private static String extractDatacenterRole(final Member member) {
return member.getRoles().stream().filter(role -> role.startsWith(DATACENTER_PREFIX))
.findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
}