Fix eos entity lookups with YangInstanceIdentifier
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / owner / supervisor / OwnerSupervisor.java
index 41c83d4723d5425cca424871f54d3a95e4d83880..f66f25aa821d2267cc18633b9752e626171da23f 100644 (file)
@@ -7,6 +7,9 @@
  */
 package org.opendaylight.controller.eos.akka.owner.supervisor;
 
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+
 import akka.actor.typed.ActorRef;
 import akka.actor.typed.Behavior;
 import akka.actor.typed.javadsl.AbstractBehavior;
@@ -14,6 +17,7 @@ import akka.actor.typed.javadsl.ActorContext;
 import akka.actor.typed.javadsl.Behaviors;
 import akka.actor.typed.javadsl.Receive;
 import akka.cluster.ClusterEvent;
+import akka.cluster.ClusterEvent.CurrentClusterState;
 import akka.cluster.Member;
 import akka.cluster.ddata.LWWRegister;
 import akka.cluster.ddata.LWWRegisterKey;
@@ -33,13 +37,23 @@ import com.google.common.collect.Sets;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.BiPredicate;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.AbstractEntityRequest;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityRequest;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
@@ -47,6 +61,7 @@ import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUpEve
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerChanged;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
+import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,7 +75,7 @@ import scala.collection.JavaConverters;
 public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
 
     private static final Logger LOG = LoggerFactory.getLogger(OwnerSupervisor.class);
-    private static final String DATACENTER_PREFIX = "dc";
+    private static final String DATACENTER_PREFIX = "dc-";
 
     private final ReplicatorMessageAdapter<OwnerSupervisorCommand, LWWRegister<String>> ownerReplicator;
 
@@ -71,6 +86,7 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
 
     private final Cluster cluster;
     private final SelfUniqueAddress node;
+    private final String dataCenter;
 
     private final Set<String> activeMembers;
 
@@ -81,19 +97,28 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
     // reverse lookup of owner to entity
     private final Multimap<String, DOMEntity> ownerToEntity = HashMultimap.create();
 
+    // only reassign owner for those entities that lost this candidate or is not reachable
+    private final BiPredicate<DOMEntity, String> reassignPredicate = (entity, candidate) ->
+            !isActiveCandidate(candidate) || !isCandidateFor(entity, candidate);
+
+    private final BindingInstanceIdentifierCodec iidCodec;
+
     private OwnerSupervisor(final ActorContext<OwnerSupervisorCommand> context,
                             final Map<DOMEntity, Set<String>> currentCandidates,
-                            final Map<DOMEntity, String> currentOwners) {
+                            final Map<DOMEntity, String> currentOwners,
+                            final BindingInstanceIdentifierCodec iidCodec) {
         super(context);
+        this.iidCodec = requireNonNull(iidCodec);
 
         final DistributedData distributedData = DistributedData.get(context.getSystem());
         final ActorRef<Replicator.Command> replicator = distributedData.replicator();
 
-        this.cluster = Cluster.get(context.getSystem());
-        this.ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
+        cluster = Cluster.get(context.getSystem());
+        ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
+        dataCenter = extractDatacenterRole(cluster.selfMember());
 
-        this.node = distributedData.selfUniqueAddress();
-        this.activeMembers = getActiveMembers(cluster);
+        node = distributedData.selfUniqueAddress();
+        activeMembers = getActiveMembers();
 
         this.currentCandidates = currentCandidates;
         this.currentOwners = currentOwners;
@@ -133,21 +158,37 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
     }
 
     public static Behavior<OwnerSupervisorCommand> create(final Map<DOMEntity, Set<String>> currentCandidates,
-                                                          final Map<DOMEntity, String> currentOwners) {
-        return Behaviors.setup(ctx -> new OwnerSupervisor(ctx, currentCandidates, currentOwners));
+            final Map<DOMEntity, String> currentOwners, final BindingInstanceIdentifierCodec iidCodec) {
+        return Behaviors.setup(ctx -> new OwnerSupervisor(ctx, currentCandidates, currentOwners, iidCodec));
     }
 
     @Override
     public Receive<OwnerSupervisorCommand> createReceive() {
         return newReceiveBuilder()
                 .onMessage(CandidatesChanged.class, this::onCandidatesChanged)
+                .onMessage(DeactivateDataCenter.class, this::onDeactivateDatacenter)
+                .onMessage(OwnerChanged.class, this::onOwnerChanged)
                 .onMessage(MemberUpEvent.class, this::onPeerUp)
                 .onMessage(MemberDownEvent.class, this::onPeerDown)
                 .onMessage(MemberReachableEvent.class, this::onPeerReachable)
                 .onMessage(MemberUnreachableEvent.class, this::onPeerUnreachable)
+                .onMessage(GetEntitiesRequest.class, this::onGetEntities)
+                .onMessage(GetEntityRequest.class, this::onGetEntity)
+                .onMessage(GetEntityOwnerRequest.class, this::onGetEntityOwner)
                 .build();
     }
 
+    private Behavior<OwnerSupervisorCommand> onDeactivateDatacenter(final DeactivateDataCenter command) {
+        LOG.debug("Deactivating Owner Supervisor on {}", cluster.selfMember());
+        command.getReplyTo().tell(DataCenterDeactivated.INSTANCE);
+        return IdleSupervisor.create(iidCodec);
+    }
+
+    private Behavior<OwnerSupervisorCommand> onOwnerChanged(final OwnerChanged command) {
+        LOG.debug("Owner has changed for {}", command.getResponse().key());
+        return this;
+    }
+
     private void reassignUnreachableOwners() {
         final Set<String> ownersToReassign = new HashSet<>();
         for (final String owner : ownerToEntity.keys()) {
@@ -157,7 +198,7 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
         }
 
         for (final String owner : ownersToReassign) {
-            reassignCandidatesFor(owner, ImmutableList.copyOf(ownerToEntity.get(owner)));
+            reassignCandidatesFor(owner, ImmutableList.copyOf(ownerToEntity.get(owner)), reassignPredicate);
         }
     }
 
@@ -202,7 +243,7 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
             return;
         }
 
-        final Set<String> currentlyPresent = currentCandidates.getOrDefault(entity, Collections.emptySet());
+        final Set<String> currentlyPresent = currentCandidates.getOrDefault(entity, Set.of());
         final Set<String> difference = ImmutableSet.copyOf(Sets.symmetricDifference(currentlyPresent, candidates));
 
         LOG.debug("currently present candidates: {}", currentlyPresent);
@@ -238,7 +279,8 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
 
         // then reassign those that need new owners
         for (final String toReassign : ownersToReassign) {
-            reassignCandidatesFor(toReassign, ImmutableList.copyOf(ownerToEntity.get(toReassign)));
+            reassignCandidatesFor(toReassign, ImmutableList.copyOf(ownerToEntity.get(toReassign)),
+                    reassignPredicate);
         }
 
         if (currentCandidates.get(entity) == null) {
@@ -248,19 +290,25 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
         }
     }
 
-    private void reassignCandidatesFor(final String oldOwner, final Collection<DOMEntity> entities) {
+    private void reassignCandidatesFor(final String oldOwner, final Collection<DOMEntity> entities,
+                                       final BiPredicate<DOMEntity, String> predicate) {
         LOG.debug("Reassigning owners for {}", entities);
         for (final DOMEntity entity : entities) {
-
-            // only reassign owner for those entities that lost this candidate or is not reachable
-            if (!activeMembers.contains(oldOwner)
-                    || !currentCandidates.getOrDefault(entity, Collections.emptySet()).contains(oldOwner)) {
+            if (predicate.test(entity, oldOwner)) {
                 ownerToEntity.remove(oldOwner, entity);
                 assignOwnerFor(entity);
             }
         }
     }
 
+    private boolean isActiveCandidate(final String candidate) {
+        return activeMembers.contains(candidate);
+    }
+
+    private boolean isCandidateFor(final DOMEntity entity, final String candidate) {
+        return currentCandidates.getOrDefault(entity, Set.of()).contains(candidate);
+    }
+
     private void assignOwnerFor(final DOMEntity entity) {
         final Set<String> candidatesForEntity = currentCandidates.get(entity);
         if (candidatesForEntity.isEmpty()) {
@@ -325,9 +373,29 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
         return this;
     }
 
+    private Behavior<OwnerSupervisorCommand> onGetEntities(final GetEntitiesRequest request) {
+        request.getReplyTo().tell(new GetEntitiesReply(currentOwners, currentCandidates));
+        return this;
+    }
+
+    private Behavior<OwnerSupervisorCommand> onGetEntity(final GetEntityRequest request) {
+        final DOMEntity entity = extractEntity(request);
+        request.getReplyTo().tell(new GetEntityReply(currentOwners.get(entity), currentCandidates.get(entity)));
+        return this;
+    }
+
+    private Behavior<OwnerSupervisorCommand> onGetEntityOwner(final GetEntityOwnerRequest request) {
+        request.getReplyTo().tell(new GetEntityOwnerReply(currentOwners.get(extractEntity(request))));
+        return this;
+    }
+
     private void handleReachableEvent(final Set<String> roles) {
-        activeMembers.add(extractRole(roles));
-        assignMissingOwners();
+        if (roles.contains(dataCenter)) {
+            activeMembers.add(extractRole(roles));
+            assignMissingOwners();
+        } else {
+            LOG.debug("Received reachable event from a foreign datacenter, Ignoring... Roles: {}", roles);
+        }
     }
 
     private Behavior<OwnerSupervisorCommand> onPeerDown(final MemberDownEvent event) {
@@ -345,17 +413,40 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
     }
 
     private void handleUnreachableEvent(final Set<String> roles) {
-        activeMembers.remove(extractRole(roles));
-        reassignUnreachableOwners();
+        if (roles.contains(dataCenter)) {
+            activeMembers.remove(extractRole(roles));
+            reassignUnreachableOwners();
+        } else {
+            LOG.debug("Received unreachable event from a foreign datacenter, Ignoring... Roles: {}", roles);
+        }
     }
 
-    private static Set<String> getActiveMembers(final Cluster cluster) {
-        final Set<String> activeMembers = new HashSet<>();
-        cluster.state().getMembers().forEach(member -> activeMembers.add(extractRole(member)));
-        activeMembers.removeAll(cluster.state().getUnreachable().stream()
-                .map(OwnerSupervisor::extractRole).collect(Collectors.toSet()));
+    private Set<String> getActiveMembers() {
+        final CurrentClusterState clusterState = cluster.state();
+        final Set<String> unreachableRoles = clusterState.getUnreachable().stream()
+            .map(OwnerSupervisor::extractRole)
+            .collect(Collectors.toSet());
+
+        return StreamSupport.stream(clusterState.getMembers().spliterator(), false)
+            // We are evaluating the set of roles for each member
+            .map(Member::getRoles)
+            // Filter out any members which do not share our dataCenter
+            .filter(roles -> roles.contains(dataCenter))
+            // Find first legal role
+            .map(OwnerSupervisor::extractRole)
+            // filter out unreachable roles
+            .filter(role -> !unreachableRoles.contains(role))
+            .collect(Collectors.toSet());
+    }
 
-        return activeMembers;
+    private DOMEntity extractEntity(final AbstractEntityRequest<?> request) {
+        final var name = request.getName();
+        final var iid = name.getInstanceIdentifier();
+        if (iid != null) {
+            return new DOMEntity(request.getType().getValue(), iidCodec.fromBinding(iid));
+        }
+        final var str = verifyNotNull(name.getString(), "Unhandled entity name %s", name);
+        return new DOMEntity(request.getType().getValue(), str);
     }
 
     private static String extractRole(final Member member) {
@@ -363,7 +454,12 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
     }
 
     private static String extractRole(final Set<String> roles) {
-        return roles.stream().filter(role -> !role.contains(DATACENTER_PREFIX))
+        return roles.stream().filter(role -> !role.startsWith(DATACENTER_PREFIX))
+                .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
+    }
+
+    private static String extractDatacenterRole(final Member member) {
+        return member.getRoles().stream().filter(role -> role.startsWith(DATACENTER_PREFIX))
                 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
     }
 }