Do not use RevisionSourceIdentifier
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / owner / supervisor / OwnerSupervisor.java
index 9841b65..7a4f91f 100644 (file)
@@ -12,7 +12,6 @@ import static java.util.Objects.requireNonNull;
 
 import akka.actor.typed.ActorRef;
 import akka.actor.typed.Behavior;
-import akka.actor.typed.javadsl.AbstractBehavior;
 import akka.actor.typed.javadsl.ActorContext;
 import akka.actor.typed.javadsl.Behaviors;
 import akka.actor.typed.javadsl.Receive;
@@ -47,6 +46,8 @@ import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.AbstractEntityRequest;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidates;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesForMember;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendReply;
@@ -73,7 +74,7 @@ import scala.collection.JavaConverters;
  * registry in distributed-data and picks entity owners based on the current cluster state and registered candidates.
  * On cluster up/down etc. events the owners are reassigned if possible.
  */
-public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
+public final class OwnerSupervisor extends AbstractSupervisor {
 
     private static final Logger LOG = LoggerFactory.getLogger(OwnerSupervisor.class);
     private static final String DATACENTER_PREFIX = "dc-";
@@ -152,8 +153,7 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
                 });
         cluster.subscriptions().tell(Subscribe.create(reachabilityEventAdapter, ClusterEvent.ReachabilityEvent.class));
 
-        new ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>>(context, replicator,
-            Duration.ofSeconds(5)).subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
+        candidateReplicator.subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
 
         LOG.debug("Owner Supervisor started");
     }
@@ -176,6 +176,8 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
                 .onMessage(GetEntitiesBackendRequest.class, this::onGetEntities)
                 .onMessage(GetEntityBackendRequest.class, this::onGetEntity)
                 .onMessage(GetEntityOwnerBackendRequest.class, this::onGetEntityOwner)
+                .onMessage(ClearCandidatesForMember.class, this::onClearCandidatesForMember)
+                .onMessage(ClearCandidates.class, this::finishClearCandidates)
                 .build();
     }
 
@@ -193,7 +195,7 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
     private void reassignUnreachableOwners() {
         final Set<String> ownersToReassign = new HashSet<>();
         for (final String owner : ownerToEntity.keys()) {
-            if (!activeMembers.contains(owner)) {
+            if (!isActiveCandidate(owner)) {
                 ownersToReassign.add(owner);
             }
         }
@@ -259,8 +261,10 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
                 LOG.debug("Adding new candidate for entity: {} : {}", entity, toCheck);
                 currentCandidates.get(entity).add(toCheck);
 
-                if (!currentOwners.containsKey(entity)) {
-                    // might as well assign right away when we don't have an owner
+                final String currentOwner = currentOwners.get(entity);
+
+                if (currentOwner == null || !activeMembers.contains(currentOwner)) {
+                    // might as well assign right away when we don't have an owner or its unreachable
                     assignOwnerFor(entity);
                 }
 
@@ -296,6 +300,13 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
         LOG.debug("Reassigning owners for {}", entities);
         for (final DOMEntity entity : entities) {
             if (predicate.test(entity, oldOwner)) {
+
+                if (!isActiveCandidate(oldOwner) && isCandidateFor(entity, oldOwner) && hasSingleCandidate(entity)) {
+                    // only skip new owner assignment, only if unreachable, still is a candidate and is the ONLY
+                    // candidate
+                    LOG.debug("{} is the only candidate for {}. Skipping reassignment.", oldOwner, entity);
+                    continue;
+                }
                 ownerToEntity.remove(oldOwner, entity);
                 assignOwnerFor(entity);
             }
@@ -310,6 +321,10 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
         return currentCandidates.getOrDefault(entity, Set.of()).contains(candidate);
     }
 
+    private boolean hasSingleCandidate(final DOMEntity entity) {
+        return currentCandidates.getOrDefault(entity, Set.of()).size() == 1;
+    }
+
     private void assignOwnerFor(final DOMEntity entity) {
         final Set<String> candidatesForEntity = currentCandidates.get(entity);
         if (candidatesForEntity.isEmpty()) {
@@ -465,4 +480,9 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
         return member.getRoles().stream().filter(role -> role.startsWith(DATACENTER_PREFIX))
                 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
     }
+
+    @Override
+    Logger getLogger() {
+        return LOG;
+    }
 }