X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Feos-dom-akka%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Feos%2Fakka%2Fregistry%2Fcandidate%2FCandidateRegistryInit.java;h=f9ca06896e67f3cd9e7418cec4aa3a2676d308c2;hp=34cfe78bad9fc915ae702137f40e0025e2de0b78;hb=HEAD;hpb=7ce039b3e55d153fc75bc88198c49536ab83befc diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/CandidateRegistryInit.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/CandidateRegistryInit.java index 34cfe78bad..f9ca06896e 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/CandidateRegistryInit.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/CandidateRegistryInit.java @@ -7,28 +7,25 @@ */ package org.opendaylight.controller.eos.akka.registry.candidate; +import akka.actor.typed.ActorRef; import akka.actor.typed.Behavior; import akka.actor.typed.javadsl.AbstractBehavior; import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Receive; import akka.actor.typed.javadsl.StashBuffer; -import akka.cluster.ddata.ORMap; -import akka.cluster.ddata.ORSet; -import akka.cluster.ddata.SelfUniqueAddress; -import akka.cluster.ddata.typed.javadsl.DistributedData; -import akka.cluster.ddata.typed.javadsl.Replicator; -import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter; -import akka.cluster.typed.Cluster; +import akka.cluster.Cluster; import java.time.Duration; -import java.util.Map; import java.util.Set; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesForMember; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesResponse; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand; import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand; -import org.opendaylight.controller.eos.akka.registry.candidate.command.InitialCandidateSync; -import org.opendaylight.controller.eos.akka.registry.candidate.command.InternalUpdateResponse; +import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRemovalFailed; +import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRemovalFinished; import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate; +import org.opendaylight.controller.eos.akka.registry.candidate.command.RemovePreviousCandidates; import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate; -import org.opendaylight.mdsal.eos.dom.api.DOMEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,91 +36,70 @@ public class CandidateRegistryInit extends AbstractBehavior stash; - private final ReplicatorMessageAdapter>> candidateReplicator; + private final ActorRef ownerSupervisor; private final String selfRole; - private final SelfUniqueAddress node; public CandidateRegistryInit(final ActorContext ctx, final StashBuffer stash, - final ReplicatorMessageAdapter>> candidateReplicator) { + final ActorRef ownerSupervisor) { super(ctx); this.stash = stash; - this.candidateReplicator = candidateReplicator; - selfRole = extractRole(Cluster.get(ctx.getSystem()).selfMember().getRoles()); + this.ownerSupervisor = ownerSupervisor; + this.selfRole = extractRole(Cluster.get(ctx.getSystem()).selfMember().getRoles()); - this.node = DistributedData.get(ctx.getSystem()).selfUniqueAddress(); + ctx.getSelf().tell(new RemovePreviousCandidates()); - - this.candidateReplicator.askGet( - askReplyTo -> new Replicator.Get<>( - CandidateRegistry.KEY, - new Replicator.ReadAll(Duration.ofSeconds(15)), askReplyTo), - InitialCandidateSync::new); - - LOG.debug("CandidateRegistry syncing behavior started."); + LOG.debug("{} : CandidateRegistry syncing behavior started.", selfRole); } - public static Behavior create() { + public static Behavior create(final ActorRef ownerSupervisor) { return Behaviors.withStash(100, stash -> - Behaviors.setup(ctx -> DistributedData.withReplicatorMessageAdapter( - (ReplicatorMessageAdapter>> replicatorAdapter) -> - new CandidateRegistryInit(ctx, stash, replicatorAdapter)))); + Behaviors.setup(ctx -> new CandidateRegistryInit(ctx, stash, ownerSupervisor))); } @Override public Receive createReceive() { return newReceiveBuilder() - .onMessage(InitialCandidateSync.class, this::handleCandidateSync) + .onMessage(RemovePreviousCandidates.class, this::onRemoveCandidates) + .onMessage(CandidateRemovalFinished.class, command -> switchToCandidateRegistry()) + .onMessage(CandidateRemovalFailed.class, this::candidateRemovalFailed) .onMessage(RegisterCandidate.class, this::stashCommand) .onMessage(UnregisterCandidate.class, this::stashCommand) .build(); } - private Behavior stashCommand(final CandidateRegistryCommand command) { - stash.stash(command); + private Behavior candidateRemovalFailed(final CandidateRemovalFailed command) { + LOG.warn("{} : Initial removal of candidates from previous iteration failed. Rescheduling.", selfRole, + command.getThrowable()); + getContext().getSelf().tell(new RemovePreviousCandidates()); return this; } - private Behavior handleCandidateSync(final InitialCandidateSync command) { - final Replicator.GetResponse>> response = command.getResponse(); - if (response instanceof Replicator.GetSuccess) { - clearExistingCandidates((Replicator.GetSuccess>>) response); - } - // TODO implement other cases if needed, seems like only a retry would be needed here when we get a failure - // from distributed data - return switchToCandidateRegistry(); - } - - private void clearExistingCandidates(final Replicator.GetSuccess>> response) { - final Map> entitiesToCandidates = response.get(response.key()).getEntries(); + private Behavior onRemoveCandidates(final RemovePreviousCandidates command) { + LOG.debug("Sending RemovePreviousCandidates."); + getContext().ask(ClearCandidatesResponse.class, + ownerSupervisor, Duration.ofSeconds(5), + ref -> new ClearCandidatesForMember(ref, selfRole), + (response, throwable) -> { + if (response != null) { + return new CandidateRemovalFinished(); + } else { + return new CandidateRemovalFailed(throwable); + } + }); - for (Map.Entry> entry : entitiesToCandidates.entrySet()) { - if (entry.getValue().getElements().contains(selfRole)) { - LOG.debug("Clearing candidate: {} from entity: {}, current state of entity candidates: {}", - selfRole, entry.getKey(), entry.getValue().getElements()); - clearRegistration(entry.getKey()); - } - } + return this; } - private void clearRegistration(final DOMEntity entity) { - candidateReplicator.askUpdate( - askReplyTo -> new Replicator.Update<>( - CandidateRegistry.KEY, - ORMap.empty(), - Replicator.writeLocal(), - askReplyTo, - map -> map.update(node, entity, ORSet.empty(), - value -> value.remove(node, selfRole))), - InternalUpdateResponse::new); + private Behavior stashCommand(final CandidateRegistryCommand command) { + LOG.debug("Stashing {}", command); + stash.stash(command); + return this; } private Behavior switchToCandidateRegistry() { - LOG.debug("Clearing of candidates from previous instance done, switching to CandidateRegistry."); + LOG.debug("{} : Clearing of candidates from previous instance done, switching to CandidateRegistry.", selfRole); return stash.unstashAll(CandidateRegistry.create()); }