*/
package org.opendaylight.controller.eos.akka.registry.candidate;
+import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import akka.actor.typed.javadsl.StashBuffer;
-import akka.cluster.ddata.ORMap;
-import akka.cluster.ddata.ORSet;
-import akka.cluster.ddata.SelfUniqueAddress;
-import akka.cluster.ddata.typed.javadsl.DistributedData;
-import akka.cluster.ddata.typed.javadsl.Replicator;
-import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
-import akka.cluster.typed.Cluster;
+import akka.cluster.Cluster;
import java.time.Duration;
-import java.util.Map;
import java.util.Set;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesForMember;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesResponse;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
-import org.opendaylight.controller.eos.akka.registry.candidate.command.InitialCandidateSync;
-import org.opendaylight.controller.eos.akka.registry.candidate.command.InternalUpdateResponse;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRemovalFailed;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRemovalFinished;
import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.RemovePreviousCandidates;
import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
-import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final String DATACENTER_PREFIX = "dc-";
private final StashBuffer<CandidateRegistryCommand> stash;
- private final ReplicatorMessageAdapter<CandidateRegistryCommand,
- ORMap<DOMEntity, ORSet<String>>> candidateReplicator;
+ private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
private final String selfRole;
- private final SelfUniqueAddress node;
public CandidateRegistryInit(final ActorContext<CandidateRegistryCommand> ctx,
final StashBuffer<CandidateRegistryCommand> stash,
- final ReplicatorMessageAdapter<CandidateRegistryCommand,
- ORMap<DOMEntity, ORSet<String>>> candidateReplicator) {
+ final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
super(ctx);
this.stash = stash;
- this.candidateReplicator = candidateReplicator;
- selfRole = extractRole(Cluster.get(ctx.getSystem()).selfMember().getRoles());
+ this.ownerSupervisor = ownerSupervisor;
+ this.selfRole = extractRole(Cluster.get(ctx.getSystem()).selfMember().getRoles());
- this.node = DistributedData.get(ctx.getSystem()).selfUniqueAddress();
+ ctx.getSelf().tell(new RemovePreviousCandidates());
-
- this.candidateReplicator.askGet(
- askReplyTo -> new Replicator.Get<>(
- CandidateRegistry.KEY,
- new Replicator.ReadAll(Duration.ofSeconds(15)), askReplyTo),
- InitialCandidateSync::new);
-
- LOG.debug("CandidateRegistry syncing behavior started.");
+ LOG.debug("{} : CandidateRegistry syncing behavior started.", selfRole);
}
- public static Behavior<CandidateRegistryCommand> create() {
+ public static Behavior<CandidateRegistryCommand> create(final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
return Behaviors.withStash(100,
stash ->
- Behaviors.setup(ctx -> DistributedData.withReplicatorMessageAdapter(
- (ReplicatorMessageAdapter<CandidateRegistryCommand,
- ORMap<DOMEntity, ORSet<String>>> replicatorAdapter) ->
- new CandidateRegistryInit(ctx, stash, replicatorAdapter))));
+ Behaviors.setup(ctx -> new CandidateRegistryInit(ctx, stash, ownerSupervisor)));
}
@Override
public Receive<CandidateRegistryCommand> createReceive() {
return newReceiveBuilder()
- .onMessage(InitialCandidateSync.class, this::handleCandidateSync)
+ .onMessage(RemovePreviousCandidates.class, this::onRemoveCandidates)
+ .onMessage(CandidateRemovalFinished.class, command -> switchToCandidateRegistry())
+ .onMessage(CandidateRemovalFailed.class, this::candidateRemovalFailed)
.onMessage(RegisterCandidate.class, this::stashCommand)
.onMessage(UnregisterCandidate.class, this::stashCommand)
.build();
}
- private Behavior<CandidateRegistryCommand> stashCommand(final CandidateRegistryCommand command) {
- stash.stash(command);
+ private Behavior<CandidateRegistryCommand> candidateRemovalFailed(final CandidateRemovalFailed command) {
+ LOG.warn("{} : Initial removal of candidates from previous iteration failed. Rescheduling.", selfRole,
+ command.getThrowable());
+ getContext().getSelf().tell(new RemovePreviousCandidates());
return this;
}
- private Behavior<CandidateRegistryCommand> handleCandidateSync(final InitialCandidateSync command) {
- final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response = command.getResponse();
- if (response instanceof Replicator.GetSuccess) {
- clearExistingCandidates((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response);
- }
- // TODO implement other cases if needed, seems like only a retry would be needed here when we get a failure
- // from distributed data
- return switchToCandidateRegistry();
- }
-
- private void clearExistingCandidates(final Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>> response) {
- final Map<DOMEntity, ORSet<String>> entitiesToCandidates = response.get(response.key()).getEntries();
+ private Behavior<CandidateRegistryCommand> onRemoveCandidates(final RemovePreviousCandidates command) {
+ LOG.debug("Sending RemovePreviousCandidates.");
+ getContext().ask(ClearCandidatesResponse.class,
+ ownerSupervisor, Duration.ofSeconds(5),
+ ref -> new ClearCandidatesForMember(ref, selfRole),
+ (response, throwable) -> {
+ if (response != null) {
+ return new CandidateRemovalFinished();
+ } else {
+ return new CandidateRemovalFailed(throwable);
+ }
+ });
- for (Map.Entry<DOMEntity, ORSet<String>> entry : entitiesToCandidates.entrySet()) {
- if (entry.getValue().getElements().contains(selfRole)) {
- LOG.debug("Clearing candidate: {} from entity: {}, current state of entity candidates: {}",
- selfRole, entry.getKey(), entry.getValue().getElements());
- clearRegistration(entry.getKey());
- }
- }
+ return this;
}
- private void clearRegistration(final DOMEntity entity) {
- candidateReplicator.askUpdate(
- askReplyTo -> new Replicator.Update<>(
- CandidateRegistry.KEY,
- ORMap.empty(),
- Replicator.writeLocal(),
- askReplyTo,
- map -> map.update(node, entity, ORSet.empty(),
- value -> value.remove(node, selfRole))),
- InternalUpdateResponse::new);
+ private Behavior<CandidateRegistryCommand> stashCommand(final CandidateRegistryCommand command) {
+ LOG.debug("Stashing {}", command);
+ stash.stash(command);
+ return this;
}
private Behavior<CandidateRegistryCommand> switchToCandidateRegistry() {
- LOG.debug("Clearing of candidates from previous instance done, switching to CandidateRegistry.");
+ LOG.debug("{} : Clearing of candidates from previous instance done, switching to CandidateRegistry.", selfRole);
return stash.unstashAll(CandidateRegistry.create());
}