Fix clearing of candidates from previous iterations 30/99130/8 master
authorTomas Cere <tomas.cere@pantheon.tech>
Tue, 21 Dec 2021 16:16:40 +0000 (17:16 +0100)
committerRobert Varga <nite@hq.sk>
Thu, 20 Jan 2022 12:08:07 +0000 (12:08 +0000)
During initialization the CandidateRegistry wasnt clearing
candidates from the previous instance in a reliable way.
Event with ReadMajority if the cluster members havent been updated
yet, it acted as if there were no candidates present so it did
not clear them.

Introduce a mechansim that goes through the akka singleton that
picks owners, so it only executes the clearing of candidates once
the cluster is in a healthy state.

JIRA: CONTROLLER-2025
Change-Id: I5a43908e45273adfea325621189f633ce493ca87
Signed-off-by: Tomas Cere <tomas.cere@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
19 files changed:
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/EOSMain.java
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/AbstractSupervisor.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/CandidateCleaner.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/IdleSupervisor.java
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSyncer.java
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/ClearCandidates.java [moved from opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/InitialCandidateSync.java with 58% similarity]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/ClearCandidatesForMember.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/ClearCandidatesResponse.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/ClearCandidatesUpdateResponse.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/CandidateRegistry.java
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/CandidateRegistryInit.java
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/CandidateRemovalFailed.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/CandidateRemovalFinished.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/RemovePreviousCandidates.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java
opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/DataCentersTest.java
opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/ThreeNodeReachabilityTest.java
opendaylight/md-sal/eos-dom-akka/src/test/resources/application.conf

index be1415e..dfe059a 100644 (file)
@@ -43,13 +43,14 @@ public final class EOSMain extends AbstractBehavior<BootstrapCommand> {
         final String role = Cluster.get(context.getSystem()).selfMember().getRoles().iterator().next();
 
         listenerRegistry = context.spawn(EntityTypeListenerRegistry.create(role), "ListenerRegistry");
-        candidateRegistry = context.spawn(CandidateRegistryInit.create(), "CandidateRegistry");
 
         final ClusterSingleton clusterSingleton = ClusterSingleton.get(context.getSystem());
         // start the initial sync behavior that switches to the regular one after syncing
         ownerSupervisor = clusterSingleton.init(
                 SingletonActor.of(IdleSupervisor.create(iidCodec), "OwnerSupervisor"));
 
+        candidateRegistry = context.spawn(CandidateRegistryInit.create(ownerSupervisor), "CandidateRegistry");
+
         ownerStateChecker = context.spawn(OwnerStateChecker.create(role, ownerSupervisor, iidCodec),
                 "OwnerStateChecker");
     }
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/AbstractSupervisor.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/AbstractSupervisor.java
new file mode 100644 (file)
index 0000000..27907fd
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2022 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator;
+import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import java.time.Duration;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidates;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesForMember;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesResponse;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.slf4j.Logger;
+
+abstract class AbstractSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
+
+    final ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>> candidateReplicator;
+
+    AbstractSupervisor(final ActorContext<OwnerSupervisorCommand> context) {
+        super(context);
+
+        final ActorRef<Replicator.Command> replicator = DistributedData.get(getContext().getSystem()).replicator();
+        candidateReplicator = new ReplicatorMessageAdapter<>(getContext(), replicator, Duration.ofSeconds(5));
+    }
+
+    Behavior<OwnerSupervisorCommand> onClearCandidatesForMember(final ClearCandidatesForMember command) {
+        getLogger().debug("Clearing candidates for member: {}", command.getCandidate());
+
+        candidateReplicator.askGet(
+                askReplyTo -> new Replicator.Get<>(CandidateRegistry.KEY,
+                        new Replicator.ReadMajority(Duration.ofSeconds(15)), askReplyTo),
+                response -> new ClearCandidates(response, command));
+
+        return this;
+    }
+
+    Behavior<OwnerSupervisorCommand> finishClearCandidates(final ClearCandidates command) {
+        if (command.getResponse() instanceof Replicator.GetSuccess) {
+            getLogger().debug("Retrieved candidate data, clearing candidates for {}",
+                    command.getOriginalMessage().getCandidate());
+
+            getContext().spawnAnonymous(CandidateCleaner.create()).tell(command);
+        } else {
+            getLogger().debug("Unable to retrieve candidate data for {}, no candidates present sending empty reply",
+                    command.getOriginalMessage().getCandidate());
+            command.getOriginalMessage().getReplyTo().tell(new ClearCandidatesResponse());
+        }
+
+        return this;
+    }
+
+    abstract Logger getLogger();
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/CandidateCleaner.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/CandidateCleaner.java
new file mode 100644 (file)
index 0000000..8ce9adb
--- /dev/null
@@ -0,0 +1,112 @@
+/*
+ * Copyright (c) 2022 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.SelfUniqueAddress;
+import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator;
+import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import java.time.Duration;
+import java.util.Map;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidates;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesResponse;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesUpdateResponse;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Actor that can be spawned by all the supervisor implementations that executes clearing of candidates once
+ * candidate retrieval succeeds. Once candidates for the member are cleared(or immediately if none need to be cleared),
+ * the actor stops itself.
+ */
+public final class CandidateCleaner extends AbstractBehavior<OwnerSupervisorCommand> {
+    private static final Logger LOG = LoggerFactory.getLogger(CandidateCleaner.class);
+
+    private final ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>> candidateReplicator;
+    private final SelfUniqueAddress node;
+
+    private int remaining = 0;
+
+    private CandidateCleaner(final ActorContext<OwnerSupervisorCommand> context) {
+        super(context);
+
+        final ActorRef<Replicator.Command> replicator = DistributedData.get(getContext().getSystem()).replicator();
+        candidateReplicator = new ReplicatorMessageAdapter<>(getContext(), replicator, Duration.ofSeconds(5));
+        node = DistributedData.get(context.getSystem()).selfUniqueAddress();
+
+    }
+
+    public static Behavior<OwnerSupervisorCommand> create() {
+        return Behaviors.setup(CandidateCleaner::new);
+    }
+
+    @Override
+    public Receive<OwnerSupervisorCommand> createReceive() {
+        return newReceiveBuilder()
+                .onMessage(ClearCandidates.class, this::onClearCandidates)
+                .onMessage(ClearCandidatesUpdateResponse.class, this::onClearCandidatesUpdateResponse)
+                .build();
+    }
+
+    private Behavior<OwnerSupervisorCommand> onClearCandidates(final ClearCandidates command) {
+        LOG.debug("Clearing candidates for member: {}", command.getOriginalMessage().getCandidate());
+
+        final ORMap<DOMEntity, ORSet<String>> candidates =
+                ((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) command.getResponse())
+                        .get(CandidateRegistry.KEY);
+
+        for (final Map.Entry<DOMEntity, ORSet<String>> entry : candidates.getEntries().entrySet()) {
+            if (entry.getValue().contains(command.getOriginalMessage().getCandidate())) {
+                LOG.debug("Removing {} from {}", command.getOriginalMessage().getCandidate(), entry.getKey());
+
+                remaining++;
+                candidateReplicator.askUpdate(
+                        askReplyTo -> new Replicator.Update<>(
+                                CandidateRegistry.KEY,
+                                ORMap.empty(),
+                                new Replicator.WriteMajority(Duration.ofSeconds(10)),
+                                askReplyTo,
+                                map -> map.update(node, entry.getKey(), ORSet.empty(),
+                                        value -> value.remove(node, command.getOriginalMessage().getCandidate()))),
+                        updateResponse -> new ClearCandidatesUpdateResponse(updateResponse,
+                                command.getOriginalMessage().getReplyTo()));
+            }
+        }
+
+        if (remaining == 0) {
+            LOG.debug("Did not clear any candidates for {}", command.getOriginalMessage().getCandidate());
+            command.getOriginalMessage().getReplyTo().tell(new ClearCandidatesResponse());
+            return Behaviors.stopped();
+        }
+        return this;
+    }
+
+    private Behavior<OwnerSupervisorCommand> onClearCandidatesUpdateResponse(
+            final ClearCandidatesUpdateResponse command) {
+        remaining--;
+        if (remaining == 0) {
+            LOG.debug("Last update response for candidate removal received, replying to: {}", command.getReplyTo());
+            command.getReplyTo().tell(new ClearCandidatesResponse());
+            return Behaviors.stopped();
+        } else {
+            LOG.debug("Have still {} outstanding requests after {}", remaining, command.getResponse());
+        }
+        return this;
+    }
+}
index 2baeb62..3028552 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.eos.akka.owner.supervisor;
 import static java.util.Objects.requireNonNull;
 
 import akka.actor.typed.Behavior;
-import akka.actor.typed.javadsl.AbstractBehavior;
 import akka.actor.typed.javadsl.ActorContext;
 import akka.actor.typed.javadsl.Behaviors;
 import akka.actor.typed.javadsl.Receive;
@@ -18,6 +17,8 @@ import akka.cluster.Member;
 import akka.cluster.typed.Cluster;
 import akka.pattern.StatusReply;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidates;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesForMember;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest;
@@ -32,7 +33,7 @@ import org.slf4j.LoggerFactory;
  * in the primary datacenter, or is activated on demand. Once the supervisor instance is no longer needed in the
  * secondary datacenter it needs to be deactivated manually.
  */
-public final class IdleSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
+public final class IdleSupervisor extends AbstractSupervisor {
     private static final Logger LOG = LoggerFactory.getLogger(IdleSupervisor.class);
 
     private static final String DATACENTER_PREFIX = "dc-";
@@ -56,7 +57,6 @@ public final class IdleSupervisor extends AbstractBehavior<OwnerSupervisorComman
     }
 
     public static Behavior<OwnerSupervisorCommand> create(final BindingInstanceIdentifierCodec iidCodec) {
-
         return Behaviors.setup(context -> new IdleSupervisor(context, iidCodec));
     }
 
@@ -67,6 +67,8 @@ public final class IdleSupervisor extends AbstractBehavior<OwnerSupervisorComman
                 .onMessage(GetEntitiesBackendRequest.class, this::onFailEntityRpc)
                 .onMessage(GetEntityBackendRequest.class, this::onFailEntityRpc)
                 .onMessage(GetEntityOwnerBackendRequest.class, this::onFailEntityRpc)
+                .onMessage(ClearCandidatesForMember.class, this::onClearCandidatesForMember)
+                .onMessage(ClearCandidates.class, this::finishClearCandidates)
                 .build();
     }
 
@@ -82,10 +84,15 @@ public final class IdleSupervisor extends AbstractBehavior<OwnerSupervisorComman
         return OwnerSyncer.create(message.getReplyTo(), iidCodec);
     }
 
-    private String extractDatacenterRole(final Member selfMember) {
+    private static String extractDatacenterRole(final Member selfMember) {
         return selfMember.getRoles().stream()
                 .filter(role -> role.startsWith(DATACENTER_PREFIX))
                 .findFirst()
                 .orElseThrow(() -> new IllegalArgumentException(selfMember + " does not have a valid role"));
     }
+
+    @Override
+    Logger getLogger() {
+        return LOG;
+    }
 }
index 9841b65..7a4f91f 100644 (file)
@@ -12,7 +12,6 @@ import static java.util.Objects.requireNonNull;
 
 import akka.actor.typed.ActorRef;
 import akka.actor.typed.Behavior;
-import akka.actor.typed.javadsl.AbstractBehavior;
 import akka.actor.typed.javadsl.ActorContext;
 import akka.actor.typed.javadsl.Behaviors;
 import akka.actor.typed.javadsl.Receive;
@@ -47,6 +46,8 @@ import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.AbstractEntityRequest;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidates;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesForMember;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendReply;
@@ -73,7 +74,7 @@ import scala.collection.JavaConverters;
  * registry in distributed-data and picks entity owners based on the current cluster state and registered candidates.
  * On cluster up/down etc. events the owners are reassigned if possible.
  */
-public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
+public final class OwnerSupervisor extends AbstractSupervisor {
 
     private static final Logger LOG = LoggerFactory.getLogger(OwnerSupervisor.class);
     private static final String DATACENTER_PREFIX = "dc-";
@@ -152,8 +153,7 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
                 });
         cluster.subscriptions().tell(Subscribe.create(reachabilityEventAdapter, ClusterEvent.ReachabilityEvent.class));
 
-        new ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>>(context, replicator,
-            Duration.ofSeconds(5)).subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
+        candidateReplicator.subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
 
         LOG.debug("Owner Supervisor started");
     }
@@ -176,6 +176,8 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
                 .onMessage(GetEntitiesBackendRequest.class, this::onGetEntities)
                 .onMessage(GetEntityBackendRequest.class, this::onGetEntity)
                 .onMessage(GetEntityOwnerBackendRequest.class, this::onGetEntityOwner)
+                .onMessage(ClearCandidatesForMember.class, this::onClearCandidatesForMember)
+                .onMessage(ClearCandidates.class, this::finishClearCandidates)
                 .build();
     }
 
@@ -193,7 +195,7 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
     private void reassignUnreachableOwners() {
         final Set<String> ownersToReassign = new HashSet<>();
         for (final String owner : ownerToEntity.keys()) {
-            if (!activeMembers.contains(owner)) {
+            if (!isActiveCandidate(owner)) {
                 ownersToReassign.add(owner);
             }
         }
@@ -259,8 +261,10 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
                 LOG.debug("Adding new candidate for entity: {} : {}", entity, toCheck);
                 currentCandidates.get(entity).add(toCheck);
 
-                if (!currentOwners.containsKey(entity)) {
-                    // might as well assign right away when we don't have an owner
+                final String currentOwner = currentOwners.get(entity);
+
+                if (currentOwner == null || !activeMembers.contains(currentOwner)) {
+                    // might as well assign right away when we don't have an owner or its unreachable
                     assignOwnerFor(entity);
                 }
 
@@ -296,6 +300,13 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
         LOG.debug("Reassigning owners for {}", entities);
         for (final DOMEntity entity : entities) {
             if (predicate.test(entity, oldOwner)) {
+
+                if (!isActiveCandidate(oldOwner) && isCandidateFor(entity, oldOwner) && hasSingleCandidate(entity)) {
+                    // only skip new owner assignment, only if unreachable, still is a candidate and is the ONLY
+                    // candidate
+                    LOG.debug("{} is the only candidate for {}. Skipping reassignment.", oldOwner, entity);
+                    continue;
+                }
                 ownerToEntity.remove(oldOwner, entity);
                 assignOwnerFor(entity);
             }
@@ -310,6 +321,10 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
         return currentCandidates.getOrDefault(entity, Set.of()).contains(candidate);
     }
 
+    private boolean hasSingleCandidate(final DOMEntity entity) {
+        return currentCandidates.getOrDefault(entity, Set.of()).size() == 1;
+    }
+
     private void assignOwnerFor(final DOMEntity entity) {
         final Set<String> candidatesForEntity = currentCandidates.get(entity);
         if (candidatesForEntity.isEmpty()) {
@@ -465,4 +480,9 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
         return member.getRoles().stream().filter(role -> role.startsWith(DATACENTER_PREFIX))
                 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
     }
+
+    @Override
+    Logger getLogger() {
+        return LOG;
+    }
 }
index 092f532..32a0a64 100644 (file)
@@ -11,7 +11,6 @@ import static java.util.Objects.requireNonNull;
 
 import akka.actor.typed.ActorRef;
 import akka.actor.typed.Behavior;
-import akka.actor.typed.javadsl.AbstractBehavior;
 import akka.actor.typed.javadsl.ActorContext;
 import akka.actor.typed.javadsl.Behaviors;
 import akka.actor.typed.javadsl.Receive;
@@ -29,6 +28,8 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidates;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesForMember;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterActivated;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest;
@@ -48,7 +49,7 @@ import org.slf4j.LoggerFactory;
  * Behavior that retrieves current candidates/owners from distributed-data and switches to OwnerSupervisor when the
  * sync has finished.
  */
-public final class OwnerSyncer extends AbstractBehavior<OwnerSupervisorCommand> {
+public final class OwnerSyncer extends AbstractSupervisor {
     private static final Logger LOG = LoggerFactory.getLogger(OwnerSyncer.class);
 
     private final ReplicatorMessageAdapter<OwnerSupervisorCommand, LWWRegister<String>> ownerReplicator;
@@ -72,8 +73,7 @@ public final class OwnerSyncer extends AbstractBehavior<OwnerSupervisorCommand>
 
         ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
 
-        new ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>>(context, replicator,
-            Duration.ofSeconds(5)).askGet(
+        candidateReplicator.askGet(
                 askReplyTo -> new Replicator.Get<>(CandidateRegistry.KEY, Replicator.readLocal(), askReplyTo),
                 InitialCandidateSync::new);
 
@@ -95,6 +95,8 @@ public final class OwnerSyncer extends AbstractBehavior<OwnerSupervisorCommand>
                 .onMessage(GetEntitiesBackendRequest.class, this::onFailEntityRpc)
                 .onMessage(GetEntityBackendRequest.class, this::onFailEntityRpc)
                 .onMessage(GetEntityOwnerBackendRequest.class, this::onFailEntityRpc)
+                .onMessage(ClearCandidatesForMember.class, this::onClearCandidatesForMember)
+                .onMessage(ClearCandidates.class, this::finishClearCandidates)
                 .build();
     }
 
@@ -176,4 +178,9 @@ public final class OwnerSyncer extends AbstractBehavior<OwnerSupervisorCommand>
     private static void handleNotFoundOwnerRsp(final Replicator.NotFound<LWWRegister<String>> rsp) {
         LOG.debug("Owner not found. {}", rsp);
     }
+
+    @Override
+    Logger getLogger() {
+        return LOG;
+    }
 }
@@ -5,22 +5,29 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.eos.akka.registry.candidate.command;
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
 
 import akka.cluster.ddata.ORMap;
 import akka.cluster.ddata.ORSet;
 import akka.cluster.ddata.typed.javadsl.Replicator;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
 
-public class InitialCandidateSync extends CandidateRegistryCommand {
+public class ClearCandidates extends OwnerSupervisorCommand {
 
     private final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response;
+    private final ClearCandidatesForMember originalMessage;
 
-    public InitialCandidateSync(final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response) {
+    public ClearCandidates(final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response,
+                           final ClearCandidatesForMember originalMessage) {
         this.response = response;
+        this.originalMessage = originalMessage;
     }
 
     public Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> getResponse() {
         return response;
     }
+
+    public ClearCandidatesForMember getOriginalMessage() {
+        return originalMessage;
+    }
 }
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/ClearCandidatesForMember.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/ClearCandidatesForMember.java
new file mode 100644 (file)
index 0000000..1e27cb5
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import akka.actor.typed.ActorRef;
+import java.io.Serializable;
+
+/**
+ * Request sent from Candidate registration actors to clear the candidate from all entities. Issued at start to clear
+ * candidates from previous iteration of a node. Owner supervisor responds to this request to notify the registration
+ * actor it can start up and process candidate requests.
+ */
+public class ClearCandidatesForMember extends OwnerSupervisorCommand implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final ActorRef<ClearCandidatesResponse> replyTo;
+    private final String candidate;
+
+    public ClearCandidatesForMember(final ActorRef<ClearCandidatesResponse> replyTo, final String candidate) {
+        this.replyTo = replyTo;
+        this.candidate = candidate;
+    }
+
+    public ActorRef<ClearCandidatesResponse> getReplyTo() {
+        return replyTo;
+    }
+
+    public String getCandidate() {
+        return candidate;
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/ClearCandidatesResponse.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/ClearCandidatesResponse.java
new file mode 100644 (file)
index 0000000..7399bd8
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import java.io.Serializable;
+
+/**
+ * Response sent from OwnerSupervisor to the ClearCandidatesForMember request, notifying the caller that removal has
+ * finished.
+ */
+public class ClearCandidatesResponse implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/ClearCandidatesUpdateResponse.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/ClearCandidatesUpdateResponse.java
new file mode 100644 (file)
index 0000000..9f48323
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import akka.actor.typed.ActorRef;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.typed.javadsl.Replicator;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public class ClearCandidatesUpdateResponse extends OwnerSupervisorCommand {
+    private final Replicator.UpdateResponse<ORMap<DOMEntity, ORSet<String>>> response;
+    private final ActorRef<ClearCandidatesResponse> replyTo;
+
+    public ClearCandidatesUpdateResponse(final Replicator.UpdateResponse<ORMap<DOMEntity, ORSet<String>>> response,
+                                         final ActorRef<ClearCandidatesResponse> replyTo) {
+        this.response = response;
+        this.replyTo = replyTo;
+    }
+
+    public Replicator.UpdateResponse<ORMap<DOMEntity, ORSet<String>>> getResponse() {
+        return response;
+    }
+
+
+    public ActorRef<ClearCandidatesResponse> getReplyTo() {
+        return replyTo;
+    }
+}
index 16c2ab6..03ecbae 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.typed.javadsl.AbstractBehavior;
 import akka.actor.typed.javadsl.ActorContext;
 import akka.actor.typed.javadsl.Behaviors;
 import akka.actor.typed.javadsl.Receive;
+import akka.cluster.Cluster;
 import akka.cluster.ddata.Key;
 import akka.cluster.ddata.ORMap;
 import akka.cluster.ddata.ORMapKey;
@@ -20,6 +21,7 @@ import akka.cluster.ddata.SelfUniqueAddress;
 import akka.cluster.ddata.typed.javadsl.DistributedData;
 import akka.cluster.ddata.typed.javadsl.Replicator;
 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import java.util.Set;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.InternalUpdateResponse;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
@@ -35,10 +37,13 @@ public final class CandidateRegistry extends AbstractBehavior<CandidateRegistryC
 
     private static final Logger LOG = LoggerFactory.getLogger(CandidateRegistry.class);
 
+    private static final String DATACENTER_PREFIX = "dc-";
+
     public static final Key<ORMap<DOMEntity, ORSet<String>>> KEY = new ORMapKey<>("candidateRegistry");
 
     private final ReplicatorMessageAdapter<CandidateRegistryCommand, ORMap<DOMEntity, ORSet<String>>> replicatorAdapter;
     private final SelfUniqueAddress node;
+    private final String selfRole;
 
     private CandidateRegistry(final ActorContext<CandidateRegistryCommand> context,
                               final ReplicatorMessageAdapter<CandidateRegistryCommand,
@@ -47,8 +52,9 @@ public final class CandidateRegistry extends AbstractBehavior<CandidateRegistryC
         this.replicatorAdapter = replicatorAdapter;
 
         this.node = DistributedData.get(context.getSystem()).selfUniqueAddress();
+        this.selfRole = extractRole(Cluster.get(context.getSystem()).selfMember().getRoles());
 
-        LOG.debug("Candidate registry started");
+        LOG.debug("{} : Candidate registry started", selfRole);
     }
 
     public static Behavior<CandidateRegistryCommand> create() {
@@ -69,7 +75,7 @@ public final class CandidateRegistry extends AbstractBehavior<CandidateRegistryC
     }
 
     private Behavior<CandidateRegistryCommand> onRegisterCandidate(final RegisterCandidate registerCandidate) {
-        LOG.debug("Registering candidate({}) for entity: {}",
+        LOG.debug("{} - Registering candidate({}) for entity: {}", selfRole,
                 registerCandidate.getCandidate(), registerCandidate.getEntity());
         replicatorAdapter.askUpdate(
                 askReplyTo -> new Replicator.Update<>(
@@ -84,7 +90,7 @@ public final class CandidateRegistry extends AbstractBehavior<CandidateRegistryC
     }
 
     private Behavior<CandidateRegistryCommand> onUnregisterCandidate(final UnregisterCandidate unregisterCandidate) {
-        LOG.debug("Removing candidate({}) from entity: {}",
+        LOG.debug("{} - Removing candidate({}) from entity: {}", selfRole,
                 unregisterCandidate.getCandidate(), unregisterCandidate.getEntity());
         replicatorAdapter.askUpdate(
                 askReplyTo -> new Replicator.Update<>(
@@ -99,7 +105,12 @@ public final class CandidateRegistry extends AbstractBehavior<CandidateRegistryC
     }
 
     private Behavior<CandidateRegistryCommand> onInternalUpdateResponse(final InternalUpdateResponse updateResponse) {
-        LOG.debug("Received update response: {}", updateResponse.getRsp());
+        LOG.debug("{} : Received update response: {}", selfRole, updateResponse.getRsp());
         return this;
     }
+
+    private static String extractRole(final Set<String> roles) {
+        return roles.stream().filter(role -> !role.contains(DATACENTER_PREFIX))
+                .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
+    }
 }
index 34cfe78..f9ca068 100644 (file)
@@ -7,28 +7,25 @@
  */
 package org.opendaylight.controller.eos.akka.registry.candidate;
 
+import akka.actor.typed.ActorRef;
 import akka.actor.typed.Behavior;
 import akka.actor.typed.javadsl.AbstractBehavior;
 import akka.actor.typed.javadsl.ActorContext;
 import akka.actor.typed.javadsl.Behaviors;
 import akka.actor.typed.javadsl.Receive;
 import akka.actor.typed.javadsl.StashBuffer;
-import akka.cluster.ddata.ORMap;
-import akka.cluster.ddata.ORSet;
-import akka.cluster.ddata.SelfUniqueAddress;
-import akka.cluster.ddata.typed.javadsl.DistributedData;
-import akka.cluster.ddata.typed.javadsl.Replicator;
-import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
-import akka.cluster.typed.Cluster;
+import akka.cluster.Cluster;
 import java.time.Duration;
-import java.util.Map;
 import java.util.Set;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesForMember;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesResponse;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
-import org.opendaylight.controller.eos.akka.registry.candidate.command.InitialCandidateSync;
-import org.opendaylight.controller.eos.akka.registry.candidate.command.InternalUpdateResponse;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRemovalFailed;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRemovalFinished;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.RemovePreviousCandidates;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
-import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,91 +36,70 @@ public class CandidateRegistryInit extends AbstractBehavior<CandidateRegistryCom
     private static final String DATACENTER_PREFIX = "dc-";
 
     private final StashBuffer<CandidateRegistryCommand> stash;
-    private final ReplicatorMessageAdapter<CandidateRegistryCommand,
-            ORMap<DOMEntity, ORSet<String>>> candidateReplicator;
+    private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
     private final String selfRole;
-    private final SelfUniqueAddress node;
 
     public CandidateRegistryInit(final ActorContext<CandidateRegistryCommand> ctx,
                                  final StashBuffer<CandidateRegistryCommand> stash,
-                                 final ReplicatorMessageAdapter<CandidateRegistryCommand,
-                                         ORMap<DOMEntity, ORSet<String>>> candidateReplicator) {
+                                 final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
         super(ctx);
         this.stash = stash;
-        this.candidateReplicator = candidateReplicator;
-        selfRole = extractRole(Cluster.get(ctx.getSystem()).selfMember().getRoles());
+        this.ownerSupervisor = ownerSupervisor;
+        this.selfRole = extractRole(Cluster.get(ctx.getSystem()).selfMember().getRoles());
 
-        this.node = DistributedData.get(ctx.getSystem()).selfUniqueAddress();
+        ctx.getSelf().tell(new RemovePreviousCandidates());
 
-
-        this.candidateReplicator.askGet(
-                askReplyTo -> new Replicator.Get<>(
-                        CandidateRegistry.KEY,
-                        new Replicator.ReadAll(Duration.ofSeconds(15)), askReplyTo),
-                InitialCandidateSync::new);
-
-        LOG.debug("CandidateRegistry syncing behavior started.");
+        LOG.debug("{} : CandidateRegistry syncing behavior started.", selfRole);
     }
 
-    public static Behavior<CandidateRegistryCommand> create() {
+    public static Behavior<CandidateRegistryCommand> create(final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
         return Behaviors.withStash(100,
                 stash ->
-                        Behaviors.setup(ctx -> DistributedData.withReplicatorMessageAdapter(
-                                (ReplicatorMessageAdapter<CandidateRegistryCommand,
-                                        ORMap<DOMEntity, ORSet<String>>> replicatorAdapter) ->
-                                        new CandidateRegistryInit(ctx, stash, replicatorAdapter))));
+                        Behaviors.setup(ctx -> new CandidateRegistryInit(ctx, stash, ownerSupervisor)));
     }
 
     @Override
     public Receive<CandidateRegistryCommand> createReceive() {
         return newReceiveBuilder()
-                .onMessage(InitialCandidateSync.class, this::handleCandidateSync)
+                .onMessage(RemovePreviousCandidates.class, this::onRemoveCandidates)
+                .onMessage(CandidateRemovalFinished.class, command -> switchToCandidateRegistry())
+                .onMessage(CandidateRemovalFailed.class, this::candidateRemovalFailed)
                 .onMessage(RegisterCandidate.class, this::stashCommand)
                 .onMessage(UnregisterCandidate.class, this::stashCommand)
                 .build();
     }
 
-    private Behavior<CandidateRegistryCommand> stashCommand(final CandidateRegistryCommand command) {
-        stash.stash(command);
+    private Behavior<CandidateRegistryCommand> candidateRemovalFailed(final CandidateRemovalFailed command) {
+        LOG.warn("{} : Initial removal of candidates from previous iteration failed. Rescheduling.", selfRole,
+                command.getThrowable());
+        getContext().getSelf().tell(new RemovePreviousCandidates());
         return this;
     }
 
-    private Behavior<CandidateRegistryCommand> handleCandidateSync(final InitialCandidateSync command) {
-        final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response = command.getResponse();
-        if (response instanceof Replicator.GetSuccess) {
-            clearExistingCandidates((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response);
-        }
-        // TODO implement other cases if needed, seems like only a retry would be needed here when we get a failure
-        // from distributed data
-        return switchToCandidateRegistry();
-    }
-
-    private void clearExistingCandidates(final Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>> response) {
-        final Map<DOMEntity, ORSet<String>> entitiesToCandidates = response.get(response.key()).getEntries();
+    private Behavior<CandidateRegistryCommand> onRemoveCandidates(final RemovePreviousCandidates command) {
+        LOG.debug("Sending RemovePreviousCandidates.");
+        getContext().ask(ClearCandidatesResponse.class,
+                ownerSupervisor, Duration.ofSeconds(5),
+                ref -> new ClearCandidatesForMember(ref, selfRole),
+                (response, throwable) -> {
+                    if (response != null) {
+                        return new CandidateRemovalFinished();
+                    } else {
+                        return new CandidateRemovalFailed(throwable);
+                    }
+                });
 
-        for (Map.Entry<DOMEntity, ORSet<String>> entry : entitiesToCandidates.entrySet()) {
-            if (entry.getValue().getElements().contains(selfRole)) {
-                LOG.debug("Clearing candidate: {} from entity: {}, current state of entity candidates: {}",
-                        selfRole, entry.getKey(), entry.getValue().getElements());
-                clearRegistration(entry.getKey());
-            }
-        }
+        return this;
     }
 
-    private void clearRegistration(final DOMEntity entity) {
-        candidateReplicator.askUpdate(
-                askReplyTo -> new Replicator.Update<>(
-                        CandidateRegistry.KEY,
-                        ORMap.empty(),
-                        Replicator.writeLocal(),
-                        askReplyTo,
-                        map -> map.update(node, entity, ORSet.empty(),
-                                value -> value.remove(node, selfRole))),
-                InternalUpdateResponse::new);
+    private Behavior<CandidateRegistryCommand> stashCommand(final CandidateRegistryCommand command) {
+        LOG.debug("Stashing {}", command);
+        stash.stash(command);
+        return this;
     }
 
     private Behavior<CandidateRegistryCommand> switchToCandidateRegistry() {
-        LOG.debug("Clearing of candidates from previous instance done, switching to CandidateRegistry.");
+        LOG.debug("{} : Clearing of candidates from previous instance done, switching to CandidateRegistry.", selfRole);
         return stash.unstashAll(CandidateRegistry.create());
     }
 
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/CandidateRemovalFailed.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/CandidateRemovalFailed.java
new file mode 100644 (file)
index 0000000..0410942
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.candidate.command;
+
+public class CandidateRemovalFailed extends CandidateRegistryCommand {
+
+    private final Throwable throwable;
+
+    public CandidateRemovalFailed(final Throwable throwable) {
+        this.throwable = throwable;
+    }
+
+    public Throwable getThrowable() {
+        return throwable;
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/CandidateRemovalFinished.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/CandidateRemovalFinished.java
new file mode 100644 (file)
index 0000000..3c42b10
--- /dev/null
@@ -0,0 +1,11 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.candidate.command;
+
+public class CandidateRemovalFinished extends CandidateRegistryCommand {
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/RemovePreviousCandidates.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/RemovePreviousCandidates.java
new file mode 100644 (file)
index 0000000..9e1da1e
--- /dev/null
@@ -0,0 +1,15 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.candidate.command;
+
+/**
+ * Message sent to candidate registry initial behavior by self to trigger and retrigger(in case of failures) removal
+ * of candidates registered by the previous iteration of this node.
+ */
+public class RemovePreviousCandidates extends CandidateRegistryCommand {
+}
index 27b4bcb..c4e97d6 100644 (file)
@@ -19,6 +19,8 @@ import akka.actor.typed.javadsl.AskPattern;
 import akka.actor.typed.javadsl.Behaviors;
 import akka.cluster.ddata.LWWRegister;
 import akka.cluster.ddata.LWWRegisterKey;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
 import akka.cluster.ddata.typed.javadsl.DistributedData;
 import akka.cluster.ddata.typed.javadsl.Replicator;
 import com.typesafe.config.Config;
@@ -44,6 +46,7 @@ import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReach
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
+import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
@@ -263,7 +266,7 @@ public abstract class AbstractNativeEosTest {
     }
 
     protected static void waitUntillOwnerPresent(final ClusterNode clusterNode, final DOMEntity entity) {
-        await().until(() -> {
+        await().atMost(Duration.ofSeconds(15)).until(() -> {
             final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem());
             final CompletionStage<Replicator.GetResponse<LWWRegister<String>>> ask =
                     AskPattern.ask(distributedData.replicator(),
@@ -284,6 +287,32 @@ public abstract class AbstractNativeEosTest {
         });
     }
 
+    protected static void waitUntillCandidatePresent(final ClusterNode clusterNode, final DOMEntity entity,
+                                                     final String candidate) {
+        await().atMost(Duration.ofSeconds(15)).until(() -> {
+            final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem());
+
+            final CompletionStage<Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>>> ask =
+                    AskPattern.ask(distributedData.replicator(),
+                            replyTo -> new Replicator.Get<>(
+                                    CandidateRegistry.KEY, Replicator.readLocal(), replyTo),
+                            Duration.ofSeconds(5),
+                            clusterNode.getActorSystem().scheduler());
+
+            final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response =
+                    ask.toCompletableFuture().get(5, TimeUnit.SECONDS);
+
+            if (response instanceof Replicator.GetSuccess) {
+                final Map<DOMEntity, ORSet<String>> entries =
+                        ((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response).dataValue().getEntries();
+
+                return entries.get(entity).contains(candidate);
+
+            }
+            return false;
+        });
+    }
+
     protected static CompletableFuture<OwnerSupervisorReply> activateDatacenter(final ClusterNode clusterNode) {
         final CompletionStage<OwnerSupervisorReply> ask =
                 AskPattern.ask(clusterNode.getOwnerSupervisor(),
@@ -306,7 +335,7 @@ public abstract class AbstractNativeEosTest {
                                               final boolean hasOwner, final boolean isOwner, final boolean wasOwner) {
         await().until(() -> !listener.getChanges().isEmpty());
 
-        await().untilAsserted(() -> {
+        await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
             final List<DOMEntityOwnershipChange> changes = listener.getChanges();
             final DOMEntityOwnershipChange domEntityOwnershipChange = listener.getChanges().get(changes.size() - 1);
             assertEquals(entity, domEntityOwnershipChange.getEntity());
index 28566c7..e4927ca 100644 (file)
@@ -60,7 +60,6 @@ public class DataCentersTest extends AbstractNativeEosTest {
     public void testDatacenterActivation() throws Exception {
         registerCandidates(node1, ENTITY_1, "member-1");
         registerCandidates(node3, ENTITY_1, "member-3");
-        registerCandidates(node4, ENTITY_1, "member-4");
 
         activateDatacenter(node1).get();
 
@@ -82,18 +81,15 @@ public class DataCentersTest extends AbstractNativeEosTest {
         verifyListenerState(listener1, ENTITY_1, true, false, false);
         verifyListenerState(listener2, ENTITY_1, true, true, false);
 
+        registerCandidates(node4, ENTITY_1, "member-4");
         unregisterCandidates(node3, ENTITY_1, "member-3");
 
         // checking index after notif so current + 1
         verifyListenerState(listener1, ENTITY_1, true, false, false);
-        verifyListenerState(listener2, ENTITY_1, true, false, true);
+        verifyListenerState(listener2, ENTITY_1, true, false, false);
 
         deactivateDatacenter(node3).get();
         activateDatacenter(node2).get();
-
-        // no candidate in dc-primary so no owners after datacenter activation
-        verifyListenerState(listener1, ENTITY_1, false, false, false);
-        verifyListenerState(listener2, ENTITY_1, false, false, false);
     }
 
     @Test
@@ -102,9 +98,13 @@ public class DataCentersTest extends AbstractNativeEosTest {
         registerCandidates(node3, ENTITY_1, "member-3");
         registerCandidates(node4, ENTITY_1, "member-4");
 
+        waitUntillCandidatePresent(node1, ENTITY_1, "member-1");
+        waitUntillCandidatePresent(node1, ENTITY_1, "member-3");
+        waitUntillCandidatePresent(node1, ENTITY_1, "member-4");
+
         activateDatacenter(node1).get();
 
-        waitUntillOwnerPresent(node1, ENTITY_1);
+        waitUntillOwnerPresent(node4, ENTITY_1);
         final MockEntityOwnershipListener listener1 = registerListener(node1, ENTITY_1);
         verifyListenerState(listener1, ENTITY_1, true, true, false);
 
@@ -122,6 +122,7 @@ public class DataCentersTest extends AbstractNativeEosTest {
         activateDatacenter(node3).get();
         verifyListenerState(listener2, ENTITY_1, true, true, false);
 
+        waitUntillOwnerPresent(node3, ENTITY_1);
         unregisterCandidates(node3, ENTITY_1, "member-3");
         verifyListenerState(listener2, ENTITY_1, true, false, true);
     }
index 71fffe1..7699799 100644 (file)
@@ -223,6 +223,39 @@ public class ThreeNodeReachabilityTest extends AbstractNativeEosTest {
         verifyListenerState(node1Listener, ENTITY_1, true, false, false);
     }
 
+    @Test
+    public void testOwnerNotReassignedWhenOnlyCandidate() throws Exception {
+        startNode3();
+        final MockEntityOwnershipListener listener1 = registerListener(node1, ENTITY_1);
+        final MockEntityOwnershipListener listener2 = registerListener(node2, ENTITY_1);
+        verifyNoNotifications(listener1);
+        verifyNoNotifications(listener2);
+
+        registerCandidates(node3, ENTITY_1, "member-3");
+        waitUntillOwnerPresent(node1, ENTITY_1);
+
+        MockEntityOwnershipListener listener3 = registerListener(node3, ENTITY_1);
+        verifyListenerState(listener1, ENTITY_1, true, false, false);
+        verifyListenerState(listener3, ENTITY_1, true, true, false);
+
+        ActorTestKit.shutdown(node3.getActorSystem(), Duration.ofSeconds(20));
+
+        verifyListenerState(listener1, ENTITY_1, true, false, false);
+        verifyListenerState(listener2, ENTITY_1, true, false, false);
+
+        startNode3();
+        verifyListenerState(listener1, ENTITY_1, false, false, false);
+
+        listener3 = registerListener(node3, ENTITY_1);
+        verifyListenerState(listener3, ENTITY_1, false, false, false);
+
+        registerCandidates(node1, ENTITY_1, "member-1");
+
+        verifyListenerState(listener1, ENTITY_1, true, true, false);
+        verifyListenerState(listener3, ENTITY_1, true, false, false);
+
+    }
+
     private void startNode3() throws Exception {
         startNode3(3);
     }
@@ -232,7 +265,7 @@ public class ThreeNodeReachabilityTest extends AbstractNativeEosTest {
 
         // need to wait until all nodes are ready
         final Cluster cluster = Cluster.get(node2.getActorSystem());
-        await().atMost(Duration.ofSeconds(20)).until(() -> {
+        await().atMost(Duration.ofSeconds(30)).until(() -> {
             final List<Member> members = ImmutableList.copyOf(cluster.state().getMembers());
             if (members.size() != membersPresent) {
                 return false;
index ff23633..08c2a36 100644 (file)
@@ -31,7 +31,11 @@ akka {
         # This value controls how quickly Entity Ownership Service decisions are
         # propagated within a node.
         notify-subscribers-interval = 20 ms
-      }
+    }
+    split-brain-resolver {
+      active-strategy = keep-majority
+      stable-after = 7s
+    }
   }
 }