From 8bc6af2b8238c8df18c589267aeda163f439b4a7 Mon Sep 17 00:00:00 2001 From: Tomas Cere Date: Tue, 11 May 2021 12:32:01 +0200 Subject: [PATCH] Cleanup candidate registrations from previous instance JIRA: CONTROLLER-1982 Change-Id: I5337b08f6fd951a7a2acabb935cbf2e2156f5985 Signed-off-by: Tomas Cere --- .../eos/akka/bootstrap/EOSMain.java | 4 +- .../owner/supervisor/OwnerSupervisor.java | 26 +++- .../candidate/CandidateRegistryInit.java | 134 ++++++++++++++++++ .../command/InitialCandidateSync.java | 26 ++++ 4 files changed, 182 insertions(+), 8 deletions(-) create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/CandidateRegistryInit.java create mode 100644 opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/InitialCandidateSync.java diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/EOSMain.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/EOSMain.java index 9e646fa5c7..4fba4befb9 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/EOSMain.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/EOSMain.java @@ -24,7 +24,7 @@ import org.opendaylight.controller.eos.akka.owner.checker.OwnerStateChecker; import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand; import org.opendaylight.controller.eos.akka.owner.supervisor.IdleSupervisor; import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand; -import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry; +import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistryInit; import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand; import org.opendaylight.controller.eos.akka.registry.listener.type.EntityTypeListenerRegistry; import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand; @@ -42,7 +42,7 @@ public final class EOSMain extends AbstractBehavior { final String role = Cluster.get(context.getSystem()).selfMember().getRoles().iterator().next(); listenerRegistry = context.spawn(EntityTypeListenerRegistry.create(role), "ListenerRegistry"); - candidateRegistry = context.spawn(CandidateRegistry.create(), "CandidateRegistry"); + candidateRegistry = context.spawn(CandidateRegistryInit.create(), "CandidateRegistry"); ownerStateChecker = context.spawn(OwnerStateChecker.create(role), "OwnerStateChecker"); final ClusterSingleton clusterSingleton = ClusterSingleton.get(context.getSystem()); diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java index 91b06342b9..c1cf65b456 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java @@ -38,6 +38,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.BiPredicate; import java.util.stream.Collectors; import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged; import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated; @@ -84,6 +85,10 @@ public final class OwnerSupervisor extends AbstractBehavior ownerToEntity = HashMultimap.create(); + // only reassign owner for those entities that lost this candidate or is not reachable + private final BiPredicate reassignPredicate = (entity, candidate) -> + !isActiveCandidate(candidate) || !isCandidateFor(entity, candidate); + private OwnerSupervisor(final ActorContext context, final Map> currentCandidates, final Map currentOwners) { @@ -168,7 +173,7 @@ public final class OwnerSupervisor extends AbstractBehavior entities) { + private void reassignCandidatesFor(final String oldOwner, final Collection entities, + final BiPredicate predicate) { LOG.debug("Reassigning owners for {}", entities); for (final DOMEntity entity : entities) { - // only reassign owner for those entities that lost this candidate or is not reachable - if (!activeMembers.contains(oldOwner) - || !currentCandidates.getOrDefault(entity, Collections.emptySet()).contains(oldOwner)) { + + if (predicate.test(entity, oldOwner)) { ownerToEntity.remove(oldOwner, entity); assignOwnerFor(entity); } } } + private boolean isActiveCandidate(final String candidate) { + return activeMembers.contains(candidate); + } + + private boolean isCandidateFor(final DOMEntity entity, final String candidate) { + return currentCandidates.getOrDefault(entity, Collections.emptySet()).contains(candidate); + } + private void assignOwnerFor(final DOMEntity entity) { final Set candidatesForEntity = currentCandidates.get(entity); if (candidatesForEntity.isEmpty()) { diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/CandidateRegistryInit.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/CandidateRegistryInit.java new file mode 100644 index 0000000000..34cfe78bad --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/CandidateRegistryInit.java @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.registry.candidate; + +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; +import akka.actor.typed.javadsl.StashBuffer; +import akka.cluster.ddata.ORMap; +import akka.cluster.ddata.ORSet; +import akka.cluster.ddata.SelfUniqueAddress; +import akka.cluster.ddata.typed.javadsl.DistributedData; +import akka.cluster.ddata.typed.javadsl.Replicator; +import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter; +import akka.cluster.typed.Cluster; +import java.time.Duration; +import java.util.Map; +import java.util.Set; +import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand; +import org.opendaylight.controller.eos.akka.registry.candidate.command.InitialCandidateSync; +import org.opendaylight.controller.eos.akka.registry.candidate.command.InternalUpdateResponse; +import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate; +import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CandidateRegistryInit extends AbstractBehavior { + + private static final Logger LOG = LoggerFactory.getLogger(CandidateRegistryInit.class); + + private static final String DATACENTER_PREFIX = "dc-"; + + private final StashBuffer stash; + private final ReplicatorMessageAdapter>> candidateReplicator; + private final String selfRole; + private final SelfUniqueAddress node; + + public CandidateRegistryInit(final ActorContext ctx, + final StashBuffer stash, + final ReplicatorMessageAdapter>> candidateReplicator) { + super(ctx); + this.stash = stash; + this.candidateReplicator = candidateReplicator; + selfRole = extractRole(Cluster.get(ctx.getSystem()).selfMember().getRoles()); + + this.node = DistributedData.get(ctx.getSystem()).selfUniqueAddress(); + + + this.candidateReplicator.askGet( + askReplyTo -> new Replicator.Get<>( + CandidateRegistry.KEY, + new Replicator.ReadAll(Duration.ofSeconds(15)), askReplyTo), + InitialCandidateSync::new); + + LOG.debug("CandidateRegistry syncing behavior started."); + } + + public static Behavior create() { + return Behaviors.withStash(100, + stash -> + Behaviors.setup(ctx -> DistributedData.withReplicatorMessageAdapter( + (ReplicatorMessageAdapter>> replicatorAdapter) -> + new CandidateRegistryInit(ctx, stash, replicatorAdapter)))); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(InitialCandidateSync.class, this::handleCandidateSync) + .onMessage(RegisterCandidate.class, this::stashCommand) + .onMessage(UnregisterCandidate.class, this::stashCommand) + .build(); + } + + private Behavior stashCommand(final CandidateRegistryCommand command) { + stash.stash(command); + return this; + } + + private Behavior handleCandidateSync(final InitialCandidateSync command) { + final Replicator.GetResponse>> response = command.getResponse(); + if (response instanceof Replicator.GetSuccess) { + clearExistingCandidates((Replicator.GetSuccess>>) response); + } + // TODO implement other cases if needed, seems like only a retry would be needed here when we get a failure + // from distributed data + return switchToCandidateRegistry(); + } + + private void clearExistingCandidates(final Replicator.GetSuccess>> response) { + final Map> entitiesToCandidates = response.get(response.key()).getEntries(); + + for (Map.Entry> entry : entitiesToCandidates.entrySet()) { + if (entry.getValue().getElements().contains(selfRole)) { + LOG.debug("Clearing candidate: {} from entity: {}, current state of entity candidates: {}", + selfRole, entry.getKey(), entry.getValue().getElements()); + clearRegistration(entry.getKey()); + } + } + } + + private void clearRegistration(final DOMEntity entity) { + candidateReplicator.askUpdate( + askReplyTo -> new Replicator.Update<>( + CandidateRegistry.KEY, + ORMap.empty(), + Replicator.writeLocal(), + askReplyTo, + map -> map.update(node, entity, ORSet.empty(), + value -> value.remove(node, selfRole))), + InternalUpdateResponse::new); + } + + private Behavior switchToCandidateRegistry() { + LOG.debug("Clearing of candidates from previous instance done, switching to CandidateRegistry."); + return stash.unstashAll(CandidateRegistry.create()); + } + + private static String extractRole(final Set roles) { + return roles.stream().filter(role -> !role.contains(DATACENTER_PREFIX)) + .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found.")); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/InitialCandidateSync.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/InitialCandidateSync.java new file mode 100644 index 0000000000..2760d2e7e0 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/candidate/command/InitialCandidateSync.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.registry.candidate.command; + +import akka.cluster.ddata.ORMap; +import akka.cluster.ddata.ORSet; +import akka.cluster.ddata.typed.javadsl.Replicator; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; + +public class InitialCandidateSync extends CandidateRegistryCommand { + + private final Replicator.GetResponse>> response; + + public InitialCandidateSync(final Replicator.GetResponse>> response) { + this.response = response; + } + + public Replicator.GetResponse>> getResponse() { + return response; + } +} -- 2.36.6