Move {Identifiable,Persistent,}Payload
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / registry / candidate / CandidateRegistryInit.java
index 34cfe78..f9ca068 100644 (file)
@@ -7,28 +7,25 @@
  */
 package org.opendaylight.controller.eos.akka.registry.candidate;
 
+import akka.actor.typed.ActorRef;
 import akka.actor.typed.Behavior;
 import akka.actor.typed.javadsl.AbstractBehavior;
 import akka.actor.typed.javadsl.ActorContext;
 import akka.actor.typed.javadsl.Behaviors;
 import akka.actor.typed.javadsl.Receive;
 import akka.actor.typed.javadsl.StashBuffer;
-import akka.cluster.ddata.ORMap;
-import akka.cluster.ddata.ORSet;
-import akka.cluster.ddata.SelfUniqueAddress;
-import akka.cluster.ddata.typed.javadsl.DistributedData;
-import akka.cluster.ddata.typed.javadsl.Replicator;
-import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
-import akka.cluster.typed.Cluster;
+import akka.cluster.Cluster;
 import java.time.Duration;
-import java.util.Map;
 import java.util.Set;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesForMember;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesResponse;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
-import org.opendaylight.controller.eos.akka.registry.candidate.command.InitialCandidateSync;
-import org.opendaylight.controller.eos.akka.registry.candidate.command.InternalUpdateResponse;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRemovalFailed;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRemovalFinished;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.RemovePreviousCandidates;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
-import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,91 +36,70 @@ public class CandidateRegistryInit extends AbstractBehavior<CandidateRegistryCom
     private static final String DATACENTER_PREFIX = "dc-";
 
     private final StashBuffer<CandidateRegistryCommand> stash;
-    private final ReplicatorMessageAdapter<CandidateRegistryCommand,
-            ORMap<DOMEntity, ORSet<String>>> candidateReplicator;
+    private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
     private final String selfRole;
-    private final SelfUniqueAddress node;
 
     public CandidateRegistryInit(final ActorContext<CandidateRegistryCommand> ctx,
                                  final StashBuffer<CandidateRegistryCommand> stash,
-                                 final ReplicatorMessageAdapter<CandidateRegistryCommand,
-                                         ORMap<DOMEntity, ORSet<String>>> candidateReplicator) {
+                                 final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
         super(ctx);
         this.stash = stash;
-        this.candidateReplicator = candidateReplicator;
-        selfRole = extractRole(Cluster.get(ctx.getSystem()).selfMember().getRoles());
+        this.ownerSupervisor = ownerSupervisor;
+        this.selfRole = extractRole(Cluster.get(ctx.getSystem()).selfMember().getRoles());
 
-        this.node = DistributedData.get(ctx.getSystem()).selfUniqueAddress();
+        ctx.getSelf().tell(new RemovePreviousCandidates());
 
-
-        this.candidateReplicator.askGet(
-                askReplyTo -> new Replicator.Get<>(
-                        CandidateRegistry.KEY,
-                        new Replicator.ReadAll(Duration.ofSeconds(15)), askReplyTo),
-                InitialCandidateSync::new);
-
-        LOG.debug("CandidateRegistry syncing behavior started.");
+        LOG.debug("{} : CandidateRegistry syncing behavior started.", selfRole);
     }
 
-    public static Behavior<CandidateRegistryCommand> create() {
+    public static Behavior<CandidateRegistryCommand> create(final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
         return Behaviors.withStash(100,
                 stash ->
-                        Behaviors.setup(ctx -> DistributedData.withReplicatorMessageAdapter(
-                                (ReplicatorMessageAdapter<CandidateRegistryCommand,
-                                        ORMap<DOMEntity, ORSet<String>>> replicatorAdapter) ->
-                                        new CandidateRegistryInit(ctx, stash, replicatorAdapter))));
+                        Behaviors.setup(ctx -> new CandidateRegistryInit(ctx, stash, ownerSupervisor)));
     }
 
     @Override
     public Receive<CandidateRegistryCommand> createReceive() {
         return newReceiveBuilder()
-                .onMessage(InitialCandidateSync.class, this::handleCandidateSync)
+                .onMessage(RemovePreviousCandidates.class, this::onRemoveCandidates)
+                .onMessage(CandidateRemovalFinished.class, command -> switchToCandidateRegistry())
+                .onMessage(CandidateRemovalFailed.class, this::candidateRemovalFailed)
                 .onMessage(RegisterCandidate.class, this::stashCommand)
                 .onMessage(UnregisterCandidate.class, this::stashCommand)
                 .build();
     }
 
-    private Behavior<CandidateRegistryCommand> stashCommand(final CandidateRegistryCommand command) {
-        stash.stash(command);
+    private Behavior<CandidateRegistryCommand> candidateRemovalFailed(final CandidateRemovalFailed command) {
+        LOG.warn("{} : Initial removal of candidates from previous iteration failed. Rescheduling.", selfRole,
+                command.getThrowable());
+        getContext().getSelf().tell(new RemovePreviousCandidates());
         return this;
     }
 
-    private Behavior<CandidateRegistryCommand> handleCandidateSync(final InitialCandidateSync command) {
-        final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response = command.getResponse();
-        if (response instanceof Replicator.GetSuccess) {
-            clearExistingCandidates((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response);
-        }
-        // TODO implement other cases if needed, seems like only a retry would be needed here when we get a failure
-        // from distributed data
-        return switchToCandidateRegistry();
-    }
-
-    private void clearExistingCandidates(final Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>> response) {
-        final Map<DOMEntity, ORSet<String>> entitiesToCandidates = response.get(response.key()).getEntries();
+    private Behavior<CandidateRegistryCommand> onRemoveCandidates(final RemovePreviousCandidates command) {
+        LOG.debug("Sending RemovePreviousCandidates.");
+        getContext().ask(ClearCandidatesResponse.class,
+                ownerSupervisor, Duration.ofSeconds(5),
+                ref -> new ClearCandidatesForMember(ref, selfRole),
+                (response, throwable) -> {
+                    if (response != null) {
+                        return new CandidateRemovalFinished();
+                    } else {
+                        return new CandidateRemovalFailed(throwable);
+                    }
+                });
 
-        for (Map.Entry<DOMEntity, ORSet<String>> entry : entitiesToCandidates.entrySet()) {
-            if (entry.getValue().getElements().contains(selfRole)) {
-                LOG.debug("Clearing candidate: {} from entity: {}, current state of entity candidates: {}",
-                        selfRole, entry.getKey(), entry.getValue().getElements());
-                clearRegistration(entry.getKey());
-            }
-        }
+        return this;
     }
 
-    private void clearRegistration(final DOMEntity entity) {
-        candidateReplicator.askUpdate(
-                askReplyTo -> new Replicator.Update<>(
-                        CandidateRegistry.KEY,
-                        ORMap.empty(),
-                        Replicator.writeLocal(),
-                        askReplyTo,
-                        map -> map.update(node, entity, ORSet.empty(),
-                                value -> value.remove(node, selfRole))),
-                InternalUpdateResponse::new);
+    private Behavior<CandidateRegistryCommand> stashCommand(final CandidateRegistryCommand command) {
+        LOG.debug("Stashing {}", command);
+        stash.stash(command);
+        return this;
     }
 
     private Behavior<CandidateRegistryCommand> switchToCandidateRegistry() {
-        LOG.debug("Clearing of candidates from previous instance done, switching to CandidateRegistry.");
+        LOG.debug("{} : Clearing of candidates from previous instance done, switching to CandidateRegistry.", selfRole);
         return stash.unstashAll(CandidateRegistry.create());
     }