atomic-storage: remove type dependency at segment level I/O
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / owner / supervisor / OwnerSupervisor.java
index 5210f43dc66cbb9ade382f301cd8c42a2ca9844f..1e2a41beca3625f8521602f5129f005fdb2572d7 100644 (file)
@@ -7,13 +7,16 @@
  */
 package org.opendaylight.controller.eos.akka.owner.supervisor;
 
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+
 import akka.actor.typed.ActorRef;
 import akka.actor.typed.Behavior;
-import akka.actor.typed.javadsl.AbstractBehavior;
 import akka.actor.typed.javadsl.ActorContext;
 import akka.actor.typed.javadsl.Behaviors;
 import akka.actor.typed.javadsl.Receive;
 import akka.cluster.ClusterEvent;
+import akka.cluster.ClusterEvent.CurrentClusterState;
 import akka.cluster.Member;
 import akka.cluster.ddata.LWWRegister;
 import akka.cluster.ddata.LWWRegisterKey;
@@ -25,6 +28,7 @@ import akka.cluster.ddata.typed.javadsl.Replicator;
 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
 import akka.cluster.typed.Cluster;
 import akka.cluster.typed.Subscribe;
+import akka.pattern.StatusReply;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -33,23 +37,25 @@ import com.google.common.collect.Sets;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.BiPredicate;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.AbstractEntityRequest;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidates;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesForMember;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesReply;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesRequest;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerReply;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerRequest;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityReply;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
@@ -57,6 +63,7 @@ import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUpEve
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerChanged;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
+import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,7 +74,7 @@ import scala.collection.JavaConverters;
  * registry in distributed-data and picks entity owners based on the current cluster state and registered candidates.
  * On cluster up/down etc. events the owners are reassigned if possible.
  */
-public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
+public final class OwnerSupervisor extends AbstractSupervisor {
 
     private static final Logger LOG = LoggerFactory.getLogger(OwnerSupervisor.class);
     private static final String DATACENTER_PREFIX = "dc-";
@@ -77,7 +84,7 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
     // Our own clock implementation so we do not have to rely on synchronized clocks. This basically functions as an
     // increasing counter which is fine for our needs as we only ever have a single writer since t supervisor is
     // running in a cluster-singleton
-    private final LWWRegister.Clock<String> clock = (currentTimestamp, value) -> currentTimestamp + 1;
+    private static final LWWRegister.Clock<String> CLOCK = (currentTimestamp, value) -> currentTimestamp + 1;
 
     private final Cluster cluster;
     private final SelfUniqueAddress node;
@@ -96,10 +103,14 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
     private final BiPredicate<DOMEntity, String> reassignPredicate = (entity, candidate) ->
             !isActiveCandidate(candidate) || !isCandidateFor(entity, candidate);
 
+    private final BindingInstanceIdentifierCodec iidCodec;
+
     private OwnerSupervisor(final ActorContext<OwnerSupervisorCommand> context,
                             final Map<DOMEntity, Set<String>> currentCandidates,
-                            final Map<DOMEntity, String> currentOwners) {
+                            final Map<DOMEntity, String> currentOwners,
+                            final BindingInstanceIdentifierCodec iidCodec) {
         super(context);
+        this.iidCodec = requireNonNull(iidCodec);
 
         final DistributedData distributedData = DistributedData.get(context.getSystem());
         final ActorRef<Replicator.Command> replicator = distributedData.replicator();
@@ -142,15 +153,14 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
                 });
         cluster.subscriptions().tell(Subscribe.create(reachabilityEventAdapter, ClusterEvent.ReachabilityEvent.class));
 
-        new ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>>(context, replicator,
-            Duration.ofSeconds(5)).subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
+        candidateReplicator.subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
 
         LOG.debug("Owner Supervisor started");
     }
 
     public static Behavior<OwnerSupervisorCommand> create(final Map<DOMEntity, Set<String>> currentCandidates,
-                                                          final Map<DOMEntity, String> currentOwners) {
-        return Behaviors.setup(ctx -> new OwnerSupervisor(ctx, currentCandidates, currentOwners));
+            final Map<DOMEntity, String> currentOwners, final BindingInstanceIdentifierCodec iidCodec) {
+        return Behaviors.setup(ctx -> new OwnerSupervisor(ctx, currentCandidates, currentOwners, iidCodec));
     }
 
     @Override
@@ -163,16 +173,18 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
                 .onMessage(MemberDownEvent.class, this::onPeerDown)
                 .onMessage(MemberReachableEvent.class, this::onPeerReachable)
                 .onMessage(MemberUnreachableEvent.class, this::onPeerUnreachable)
-                .onMessage(GetEntitiesRequest.class, this::onGetEntities)
-                .onMessage(GetEntityRequest.class, this::onGetEntity)
-                .onMessage(GetEntityOwnerRequest.class, this::onGetEntityOwner)
+                .onMessage(GetEntitiesBackendRequest.class, this::onGetEntities)
+                .onMessage(GetEntityBackendRequest.class, this::onGetEntity)
+                .onMessage(GetEntityOwnerBackendRequest.class, this::onGetEntityOwner)
+                .onMessage(ClearCandidatesForMember.class, this::onClearCandidatesForMember)
+                .onMessage(ClearCandidates.class, this::finishClearCandidates)
                 .build();
     }
 
     private Behavior<OwnerSupervisorCommand> onDeactivateDatacenter(final DeactivateDataCenter command) {
         LOG.debug("Deactivating Owner Supervisor on {}", cluster.selfMember());
         command.getReplyTo().tell(DataCenterDeactivated.INSTANCE);
-        return IdleSupervisor.create();
+        return IdleSupervisor.create(iidCodec);
     }
 
     private Behavior<OwnerSupervisorCommand> onOwnerChanged(final OwnerChanged command) {
@@ -183,7 +195,7 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
     private void reassignUnreachableOwners() {
         final Set<String> ownersToReassign = new HashSet<>();
         for (final String owner : ownerToEntity.keys()) {
-            if (!activeMembers.contains(owner)) {
+            if (!isActiveCandidate(owner)) {
                 ownersToReassign.add(owner);
             }
         }
@@ -234,7 +246,7 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
             return;
         }
 
-        final Set<String> currentlyPresent = currentCandidates.getOrDefault(entity, Collections.emptySet());
+        final Set<String> currentlyPresent = currentCandidates.getOrDefault(entity, Set.of());
         final Set<String> difference = ImmutableSet.copyOf(Sets.symmetricDifference(currentlyPresent, candidates));
 
         LOG.debug("currently present candidates: {}", currentlyPresent);
@@ -249,8 +261,10 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
                 LOG.debug("Adding new candidate for entity: {} : {}", entity, toCheck);
                 currentCandidates.get(entity).add(toCheck);
 
-                if (!currentOwners.containsKey(entity)) {
-                    // might as well assign right away when we don't have an owner
+                final String currentOwner = currentOwners.get(entity);
+
+                if (currentOwner == null || !activeMembers.contains(currentOwner)) {
+                    // might as well assign right away when we don't have an owner or its unreachable
                     assignOwnerFor(entity);
                 }
 
@@ -286,6 +300,13 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
         LOG.debug("Reassigning owners for {}", entities);
         for (final DOMEntity entity : entities) {
             if (predicate.test(entity, oldOwner)) {
+
+                if (!isActiveCandidate(oldOwner) && isCandidateFor(entity, oldOwner) && hasSingleCandidate(entity)) {
+                    // only skip new owner assignment, only if unreachable, still is a candidate and is the ONLY
+                    // candidate
+                    LOG.debug("{} is the only candidate for {}. Skipping reassignment.", oldOwner, entity);
+                    continue;
+                }
                 ownerToEntity.remove(oldOwner, entity);
                 assignOwnerFor(entity);
             }
@@ -297,7 +318,11 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
     }
 
     private boolean isCandidateFor(final DOMEntity entity, final String candidate) {
-        return currentCandidates.getOrDefault(entity, Collections.emptySet()).contains(candidate);
+        return currentCandidates.getOrDefault(entity, Set.of()).contains(candidate);
+    }
+
+    private boolean hasSingleCandidate(final DOMEntity entity) {
+        return currentCandidates.getOrDefault(entity, Set.of()).size() == 1;
     }
 
     private void assignOwnerFor(final DOMEntity entity) {
@@ -346,7 +371,7 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
                         new LWWRegister<>(node.uniqueAddress(), candidate, 0),
                         Replicator.writeLocal(),
                         askReplyTo,
-                        register -> register.withValue(node, candidate, clock)),
+                        register -> register.withValue(node, candidate, CLOCK)),
                 OwnerChanged::new);
     }
 
@@ -364,19 +389,21 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
         return this;
     }
 
-    private Behavior<OwnerSupervisorCommand> onGetEntities(final GetEntitiesRequest request) {
-        request.getReplyTo().tell(new GetEntitiesReply(currentOwners, currentCandidates));
+    private Behavior<OwnerSupervisorCommand> onGetEntities(final GetEntitiesBackendRequest request) {
+        request.getReplyTo().tell(StatusReply.success(new GetEntitiesBackendReply(currentOwners, currentCandidates)));
         return this;
     }
 
-    private Behavior<OwnerSupervisorCommand> onGetEntity(final GetEntityRequest request) {
+    private Behavior<OwnerSupervisorCommand> onGetEntity(final GetEntityBackendRequest request) {
         final DOMEntity entity = extractEntity(request);
-        request.getReplyTo().tell(new GetEntityReply(currentOwners.get(entity), currentCandidates.get(entity)));
+        request.getReplyTo().tell(StatusReply.success(
+                new GetEntityBackendReply(currentOwners.get(entity), currentCandidates.get(entity))));
         return this;
     }
 
-    private Behavior<OwnerSupervisorCommand> onGetEntityOwner(final GetEntityOwnerRequest request) {
-        request.getReplyTo().tell(new GetEntityOwnerReply(currentOwners.get(extractEntity(request))));
+    private Behavior<OwnerSupervisorCommand> onGetEntityOwner(final GetEntityOwnerBackendRequest request) {
+        request.getReplyTo().tell(
+                StatusReply.success(new GetEntityOwnerBackendReply(currentOwners.get(extractEntity(request)))));
         return this;
     }
 
@@ -413,25 +440,31 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
     }
 
     private Set<String> getActiveMembers() {
-        final Set<String> members = new HashSet<>();
-        cluster.state().getMembers().forEach(member -> members.add(extractRole(member)));
-        // filter out unreachable
-        members.removeAll(cluster.state().getUnreachable().stream()
-                .map(OwnerSupervisor::extractRole)
-                .collect(Collectors.toSet()));
-
-        // filter out members not from our datacenter
-        cluster.state().getMembers().forEach(member -> {
-            if (!member.roles().contains(dataCenter)) {
-                members.remove(extractRole(member));
-            }
-        });
-
-        return members;
+        final CurrentClusterState clusterState = cluster.state();
+        final Set<String> unreachableRoles = clusterState.getUnreachable().stream()
+            .map(OwnerSupervisor::extractRole)
+            .collect(Collectors.toSet());
+
+        return StreamSupport.stream(clusterState.getMembers().spliterator(), false)
+            // We are evaluating the set of roles for each member
+            .map(Member::getRoles)
+            // Filter out any members which do not share our dataCenter
+            .filter(roles -> roles.contains(dataCenter))
+            // Find first legal role
+            .map(OwnerSupervisor::extractRole)
+            // filter out unreachable roles
+            .filter(role -> !unreachableRoles.contains(role))
+            .collect(Collectors.toSet());
     }
 
-    private static DOMEntity extractEntity(final AbstractEntityRequest<?> request) {
-        return new DOMEntity(request.getType().getValue(), request.getName().getValue());
+    private DOMEntity extractEntity(final AbstractEntityRequest<?> request) {
+        final var name = request.getName();
+        final var iid = name.getInstanceIdentifier();
+        if (iid != null) {
+            return new DOMEntity(request.getType().getValue(), iidCodec.fromBinding(iid));
+        }
+        final var str = verifyNotNull(name.getString(), "Unhandled entity name %s", name);
+        return new DOMEntity(request.getType().getValue(), str);
     }
 
     private static String extractRole(final Member member) {
@@ -443,8 +476,13 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
                 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
     }
 
-    private String extractDatacenterRole(final Member member) {
+    private static String extractDatacenterRole(final Member member) {
         return member.getRoles().stream().filter(role -> role.startsWith(DATACENTER_PREFIX))
                 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
     }
+
+    @Override
+    Logger getLogger() {
+        return LOG;
+    }
 }