Cleanup candidate registrations from previous instance 59/96159/13
authorTomas Cere <tomas.cere@pantheon.tech>
Tue, 11 May 2021 10:32:01 +0000 (12:32 +0200)
committerRobert Varga <nite@hq.sk>
Wed, 30 Jun 2021 08:18:29 +0000 (08:18 +0000)
JIRA: CONTROLLER-1982
Change-Id: I5337b08f6fd951a7a2acabb935cbf2e2156f5985
Signed-off-by: Tomas Cere <tomas.cere@pantheon.tech>
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/EOSMain.java
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/CandidateRegistryInit.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/InitialCandidateSync.java [new file with mode: 0644]

index 9e646fa5c7e9753fe379504ddbb2229357230133..4fba4befb9a355fede1bea3bf2730022d9a36665 100644 (file)
@@ -24,7 +24,7 @@ import org.opendaylight.controller.eos.akka.owner.checker.OwnerStateChecker;
 import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
 import org.opendaylight.controller.eos.akka.owner.supervisor.IdleSupervisor;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
-import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
+import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistryInit;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
 import org.opendaylight.controller.eos.akka.registry.listener.type.EntityTypeListenerRegistry;
 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
@@ -42,7 +42,7 @@ public final class EOSMain extends AbstractBehavior<BootstrapCommand> {
         final String role = Cluster.get(context.getSystem()).selfMember().getRoles().iterator().next();
 
         listenerRegistry = context.spawn(EntityTypeListenerRegistry.create(role), "ListenerRegistry");
-        candidateRegistry = context.spawn(CandidateRegistry.create(), "CandidateRegistry");
+        candidateRegistry = context.spawn(CandidateRegistryInit.create(), "CandidateRegistry");
         ownerStateChecker = context.spawn(OwnerStateChecker.create(role), "OwnerStateChecker");
 
         final ClusterSingleton clusterSingleton = ClusterSingleton.get(context.getSystem());
index 91b06342b945086f66b9436e5f54a9f7410d1a17..c1cf65b456f2ac0cf5f9d3d6696f2362d4796fe9 100644 (file)
@@ -38,6 +38,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.BiPredicate;
 import java.util.stream.Collectors;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated;
@@ -84,6 +85,10 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
     // reverse lookup of owner to entity
     private final Multimap<String, DOMEntity> ownerToEntity = HashMultimap.create();
 
+    // only reassign owner for those entities that lost this candidate or is not reachable
+    private final BiPredicate<DOMEntity, String> reassignPredicate = (entity, candidate) ->
+            !isActiveCandidate(candidate) || !isCandidateFor(entity, candidate);
+
     private OwnerSupervisor(final ActorContext<OwnerSupervisorCommand> context,
                             final Map<DOMEntity, Set<String>> currentCandidates,
                             final Map<DOMEntity, String> currentOwners) {
@@ -168,7 +173,7 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
         }
 
         for (final String owner : ownersToReassign) {
-            reassignCandidatesFor(owner, ImmutableList.copyOf(ownerToEntity.get(owner)));
+            reassignCandidatesFor(owner, ImmutableList.copyOf(ownerToEntity.get(owner)), reassignPredicate);
         }
     }
 
@@ -249,7 +254,8 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
 
         // then reassign those that need new owners
         for (final String toReassign : ownersToReassign) {
-            reassignCandidatesFor(toReassign, ImmutableList.copyOf(ownerToEntity.get(toReassign)));
+            reassignCandidatesFor(toReassign, ImmutableList.copyOf(ownerToEntity.get(toReassign)),
+                    reassignPredicate);
         }
 
         if (currentCandidates.get(entity) == null) {
@@ -259,19 +265,27 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
         }
     }
 
-    private void reassignCandidatesFor(final String oldOwner, final Collection<DOMEntity> entities) {
+    private void reassignCandidatesFor(final String oldOwner, final Collection<DOMEntity> entities,
+                                       final BiPredicate<DOMEntity, String> predicate) {
         LOG.debug("Reassigning owners for {}", entities);
         for (final DOMEntity entity : entities) {
 
-            // only reassign owner for those entities that lost this candidate or is not reachable
-            if (!activeMembers.contains(oldOwner)
-                    || !currentCandidates.getOrDefault(entity, Collections.emptySet()).contains(oldOwner)) {
+
+            if (predicate.test(entity, oldOwner)) {
                 ownerToEntity.remove(oldOwner, entity);
                 assignOwnerFor(entity);
             }
         }
     }
 
+    private boolean isActiveCandidate(final String candidate) {
+        return activeMembers.contains(candidate);
+    }
+
+    private boolean isCandidateFor(final DOMEntity entity, final String candidate) {
+        return currentCandidates.getOrDefault(entity, Collections.emptySet()).contains(candidate);
+    }
+
     private void assignOwnerFor(final DOMEntity entity) {
         final Set<String> candidatesForEntity = currentCandidates.get(entity);
         if (candidatesForEntity.isEmpty()) {
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/CandidateRegistryInit.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/CandidateRegistryInit.java
new file mode 100644 (file)
index 0000000..34cfe78
--- /dev/null
@@ -0,0 +1,134 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.candidate;
+
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.actor.typed.javadsl.StashBuffer;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.SelfUniqueAddress;
+import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator;
+import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import akka.cluster.typed.Cluster;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Set;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.InitialCandidateSync;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.InternalUpdateResponse;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CandidateRegistryInit extends AbstractBehavior<CandidateRegistryCommand> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CandidateRegistryInit.class);
+
+    private static final String DATACENTER_PREFIX = "dc-";
+
+    private final StashBuffer<CandidateRegistryCommand> stash;
+    private final ReplicatorMessageAdapter<CandidateRegistryCommand,
+            ORMap<DOMEntity, ORSet<String>>> candidateReplicator;
+    private final String selfRole;
+    private final SelfUniqueAddress node;
+
+    public CandidateRegistryInit(final ActorContext<CandidateRegistryCommand> ctx,
+                                 final StashBuffer<CandidateRegistryCommand> stash,
+                                 final ReplicatorMessageAdapter<CandidateRegistryCommand,
+                                         ORMap<DOMEntity, ORSet<String>>> candidateReplicator) {
+        super(ctx);
+        this.stash = stash;
+        this.candidateReplicator = candidateReplicator;
+        selfRole = extractRole(Cluster.get(ctx.getSystem()).selfMember().getRoles());
+
+        this.node = DistributedData.get(ctx.getSystem()).selfUniqueAddress();
+
+
+        this.candidateReplicator.askGet(
+                askReplyTo -> new Replicator.Get<>(
+                        CandidateRegistry.KEY,
+                        new Replicator.ReadAll(Duration.ofSeconds(15)), askReplyTo),
+                InitialCandidateSync::new);
+
+        LOG.debug("CandidateRegistry syncing behavior started.");
+    }
+
+    public static Behavior<CandidateRegistryCommand> create() {
+        return Behaviors.withStash(100,
+                stash ->
+                        Behaviors.setup(ctx -> DistributedData.withReplicatorMessageAdapter(
+                                (ReplicatorMessageAdapter<CandidateRegistryCommand,
+                                        ORMap<DOMEntity, ORSet<String>>> replicatorAdapter) ->
+                                        new CandidateRegistryInit(ctx, stash, replicatorAdapter))));
+    }
+
+    @Override
+    public Receive<CandidateRegistryCommand> createReceive() {
+        return newReceiveBuilder()
+                .onMessage(InitialCandidateSync.class, this::handleCandidateSync)
+                .onMessage(RegisterCandidate.class, this::stashCommand)
+                .onMessage(UnregisterCandidate.class, this::stashCommand)
+                .build();
+    }
+
+    private Behavior<CandidateRegistryCommand> stashCommand(final CandidateRegistryCommand command) {
+        stash.stash(command);
+        return this;
+    }
+
+    private Behavior<CandidateRegistryCommand> handleCandidateSync(final InitialCandidateSync command) {
+        final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response = command.getResponse();
+        if (response instanceof Replicator.GetSuccess) {
+            clearExistingCandidates((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response);
+        }
+        // TODO implement other cases if needed, seems like only a retry would be needed here when we get a failure
+        // from distributed data
+        return switchToCandidateRegistry();
+    }
+
+    private void clearExistingCandidates(final Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>> response) {
+        final Map<DOMEntity, ORSet<String>> entitiesToCandidates = response.get(response.key()).getEntries();
+
+        for (Map.Entry<DOMEntity, ORSet<String>> entry : entitiesToCandidates.entrySet()) {
+            if (entry.getValue().getElements().contains(selfRole)) {
+                LOG.debug("Clearing candidate: {} from entity: {}, current state of entity candidates: {}",
+                        selfRole, entry.getKey(), entry.getValue().getElements());
+                clearRegistration(entry.getKey());
+            }
+        }
+    }
+
+    private void clearRegistration(final DOMEntity entity) {
+        candidateReplicator.askUpdate(
+                askReplyTo -> new Replicator.Update<>(
+                        CandidateRegistry.KEY,
+                        ORMap.empty(),
+                        Replicator.writeLocal(),
+                        askReplyTo,
+                        map -> map.update(node, entity, ORSet.empty(),
+                                value -> value.remove(node, selfRole))),
+                InternalUpdateResponse::new);
+    }
+
+    private Behavior<CandidateRegistryCommand> switchToCandidateRegistry() {
+        LOG.debug("Clearing of candidates from previous instance done, switching to CandidateRegistry.");
+        return stash.unstashAll(CandidateRegistry.create());
+    }
+
+    private static String extractRole(final Set<String> roles) {
+        return roles.stream().filter(role -> !role.contains(DATACENTER_PREFIX))
+                .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/InitialCandidateSync.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/InitialCandidateSync.java
new file mode 100644 (file)
index 0000000..2760d2e
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.candidate.command;
+
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.typed.javadsl.Replicator;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public class InitialCandidateSync extends CandidateRegistryCommand {
+
+    private final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response;
+
+    public InitialCandidateSync(final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response) {
+        this.response = response;
+    }
+
+    public Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> getResponse() {
+        return response;
+    }
+}