final String role = Cluster.get(context.getSystem()).selfMember().getRoles().iterator().next();
listenerRegistry = context.spawn(EntityTypeListenerRegistry.create(role), "ListenerRegistry");
- candidateRegistry = context.spawn(CandidateRegistryInit.create(), "CandidateRegistry");
final ClusterSingleton clusterSingleton = ClusterSingleton.get(context.getSystem());
// start the initial sync behavior that switches to the regular one after syncing
ownerSupervisor = clusterSingleton.init(
SingletonActor.of(IdleSupervisor.create(iidCodec), "OwnerSupervisor"));
+ candidateRegistry = context.spawn(CandidateRegistryInit.create(ownerSupervisor), "CandidateRegistry");
+
ownerStateChecker = context.spawn(OwnerStateChecker.create(role, ownerSupervisor, iidCodec),
"OwnerStateChecker");
}
--- /dev/null
+/*
+ * Copyright (c) 2022 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator;
+import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import java.time.Duration;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidates;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesForMember;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesResponse;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.slf4j.Logger;
+
+abstract class AbstractSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
+
+ final ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>> candidateReplicator;
+
+ AbstractSupervisor(final ActorContext<OwnerSupervisorCommand> context) {
+ super(context);
+
+ final ActorRef<Replicator.Command> replicator = DistributedData.get(getContext().getSystem()).replicator();
+ candidateReplicator = new ReplicatorMessageAdapter<>(getContext(), replicator, Duration.ofSeconds(5));
+ }
+
+ Behavior<OwnerSupervisorCommand> onClearCandidatesForMember(final ClearCandidatesForMember command) {
+ getLogger().debug("Clearing candidates for member: {}", command.getCandidate());
+
+ candidateReplicator.askGet(
+ askReplyTo -> new Replicator.Get<>(CandidateRegistry.KEY,
+ new Replicator.ReadMajority(Duration.ofSeconds(15)), askReplyTo),
+ response -> new ClearCandidates(response, command));
+
+ return this;
+ }
+
+ Behavior<OwnerSupervisorCommand> finishClearCandidates(final ClearCandidates command) {
+ if (command.getResponse() instanceof Replicator.GetSuccess) {
+ getLogger().debug("Retrieved candidate data, clearing candidates for {}",
+ command.getOriginalMessage().getCandidate());
+
+ getContext().spawnAnonymous(CandidateCleaner.create()).tell(command);
+ } else {
+ getLogger().debug("Unable to retrieve candidate data for {}, no candidates present sending empty reply",
+ command.getOriginalMessage().getCandidate());
+ command.getOriginalMessage().getReplyTo().tell(new ClearCandidatesResponse());
+ }
+
+ return this;
+ }
+
+ abstract Logger getLogger();
+}
--- /dev/null
+/*
+ * Copyright (c) 2022 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.SelfUniqueAddress;
+import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator;
+import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import java.time.Duration;
+import java.util.Map;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidates;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesResponse;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesUpdateResponse;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Actor that can be spawned by all the supervisor implementations that executes clearing of candidates once
+ * candidate retrieval succeeds. Once candidates for the member are cleared(or immediately if none need to be cleared),
+ * the actor stops itself.
+ */
+public final class CandidateCleaner extends AbstractBehavior<OwnerSupervisorCommand> {
+ private static final Logger LOG = LoggerFactory.getLogger(CandidateCleaner.class);
+
+ private final ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>> candidateReplicator;
+ private final SelfUniqueAddress node;
+
+ private int remaining = 0;
+
+ private CandidateCleaner(final ActorContext<OwnerSupervisorCommand> context) {
+ super(context);
+
+ final ActorRef<Replicator.Command> replicator = DistributedData.get(getContext().getSystem()).replicator();
+ candidateReplicator = new ReplicatorMessageAdapter<>(getContext(), replicator, Duration.ofSeconds(5));
+ node = DistributedData.get(context.getSystem()).selfUniqueAddress();
+
+ }
+
+ public static Behavior<OwnerSupervisorCommand> create() {
+ return Behaviors.setup(CandidateCleaner::new);
+ }
+
+ @Override
+ public Receive<OwnerSupervisorCommand> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(ClearCandidates.class, this::onClearCandidates)
+ .onMessage(ClearCandidatesUpdateResponse.class, this::onClearCandidatesUpdateResponse)
+ .build();
+ }
+
+ private Behavior<OwnerSupervisorCommand> onClearCandidates(final ClearCandidates command) {
+ LOG.debug("Clearing candidates for member: {}", command.getOriginalMessage().getCandidate());
+
+ final ORMap<DOMEntity, ORSet<String>> candidates =
+ ((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) command.getResponse())
+ .get(CandidateRegistry.KEY);
+
+ for (final Map.Entry<DOMEntity, ORSet<String>> entry : candidates.getEntries().entrySet()) {
+ if (entry.getValue().contains(command.getOriginalMessage().getCandidate())) {
+ LOG.debug("Removing {} from {}", command.getOriginalMessage().getCandidate(), entry.getKey());
+
+ remaining++;
+ candidateReplicator.askUpdate(
+ askReplyTo -> new Replicator.Update<>(
+ CandidateRegistry.KEY,
+ ORMap.empty(),
+ new Replicator.WriteMajority(Duration.ofSeconds(10)),
+ askReplyTo,
+ map -> map.update(node, entry.getKey(), ORSet.empty(),
+ value -> value.remove(node, command.getOriginalMessage().getCandidate()))),
+ updateResponse -> new ClearCandidatesUpdateResponse(updateResponse,
+ command.getOriginalMessage().getReplyTo()));
+ }
+ }
+
+ if (remaining == 0) {
+ LOG.debug("Did not clear any candidates for {}", command.getOriginalMessage().getCandidate());
+ command.getOriginalMessage().getReplyTo().tell(new ClearCandidatesResponse());
+ return Behaviors.stopped();
+ }
+ return this;
+ }
+
+ private Behavior<OwnerSupervisorCommand> onClearCandidatesUpdateResponse(
+ final ClearCandidatesUpdateResponse command) {
+ remaining--;
+ if (remaining == 0) {
+ LOG.debug("Last update response for candidate removal received, replying to: {}", command.getReplyTo());
+ command.getReplyTo().tell(new ClearCandidatesResponse());
+ return Behaviors.stopped();
+ } else {
+ LOG.debug("Have still {} outstanding requests after {}", remaining, command.getResponse());
+ }
+ return this;
+ }
+}
import static java.util.Objects.requireNonNull;
import akka.actor.typed.Behavior;
-import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import akka.cluster.typed.Cluster;
import akka.pattern.StatusReply;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidates;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesForMember;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest;
* in the primary datacenter, or is activated on demand. Once the supervisor instance is no longer needed in the
* secondary datacenter it needs to be deactivated manually.
*/
-public final class IdleSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
+public final class IdleSupervisor extends AbstractSupervisor {
private static final Logger LOG = LoggerFactory.getLogger(IdleSupervisor.class);
private static final String DATACENTER_PREFIX = "dc-";
}
public static Behavior<OwnerSupervisorCommand> create(final BindingInstanceIdentifierCodec iidCodec) {
-
return Behaviors.setup(context -> new IdleSupervisor(context, iidCodec));
}
.onMessage(GetEntitiesBackendRequest.class, this::onFailEntityRpc)
.onMessage(GetEntityBackendRequest.class, this::onFailEntityRpc)
.onMessage(GetEntityOwnerBackendRequest.class, this::onFailEntityRpc)
+ .onMessage(ClearCandidatesForMember.class, this::onClearCandidatesForMember)
+ .onMessage(ClearCandidates.class, this::finishClearCandidates)
.build();
}
return OwnerSyncer.create(message.getReplyTo(), iidCodec);
}
- private String extractDatacenterRole(final Member selfMember) {
+ private static String extractDatacenterRole(final Member selfMember) {
return selfMember.getRoles().stream()
.filter(role -> role.startsWith(DATACENTER_PREFIX))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException(selfMember + " does not have a valid role"));
}
+
+ @Override
+ Logger getLogger() {
+ return LOG;
+ }
}
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
-import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import java.util.stream.StreamSupport;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.AbstractEntityRequest;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidates;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesForMember;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendReply;
* registry in distributed-data and picks entity owners based on the current cluster state and registered candidates.
* On cluster up/down etc. events the owners are reassigned if possible.
*/
-public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
+public final class OwnerSupervisor extends AbstractSupervisor {
private static final Logger LOG = LoggerFactory.getLogger(OwnerSupervisor.class);
private static final String DATACENTER_PREFIX = "dc-";
});
cluster.subscriptions().tell(Subscribe.create(reachabilityEventAdapter, ClusterEvent.ReachabilityEvent.class));
- new ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>>(context, replicator,
- Duration.ofSeconds(5)).subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
+ candidateReplicator.subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
LOG.debug("Owner Supervisor started");
}
.onMessage(GetEntitiesBackendRequest.class, this::onGetEntities)
.onMessage(GetEntityBackendRequest.class, this::onGetEntity)
.onMessage(GetEntityOwnerBackendRequest.class, this::onGetEntityOwner)
+ .onMessage(ClearCandidatesForMember.class, this::onClearCandidatesForMember)
+ .onMessage(ClearCandidates.class, this::finishClearCandidates)
.build();
}
private void reassignUnreachableOwners() {
final Set<String> ownersToReassign = new HashSet<>();
for (final String owner : ownerToEntity.keys()) {
- if (!activeMembers.contains(owner)) {
+ if (!isActiveCandidate(owner)) {
ownersToReassign.add(owner);
}
}
LOG.debug("Adding new candidate for entity: {} : {}", entity, toCheck);
currentCandidates.get(entity).add(toCheck);
- if (!currentOwners.containsKey(entity)) {
- // might as well assign right away when we don't have an owner
+ final String currentOwner = currentOwners.get(entity);
+
+ if (currentOwner == null || !activeMembers.contains(currentOwner)) {
+ // might as well assign right away when we don't have an owner or its unreachable
assignOwnerFor(entity);
}
LOG.debug("Reassigning owners for {}", entities);
for (final DOMEntity entity : entities) {
if (predicate.test(entity, oldOwner)) {
+
+ if (!isActiveCandidate(oldOwner) && isCandidateFor(entity, oldOwner) && hasSingleCandidate(entity)) {
+ // only skip new owner assignment, only if unreachable, still is a candidate and is the ONLY
+ // candidate
+ LOG.debug("{} is the only candidate for {}. Skipping reassignment.", oldOwner, entity);
+ continue;
+ }
ownerToEntity.remove(oldOwner, entity);
assignOwnerFor(entity);
}
return currentCandidates.getOrDefault(entity, Set.of()).contains(candidate);
}
+ private boolean hasSingleCandidate(final DOMEntity entity) {
+ return currentCandidates.getOrDefault(entity, Set.of()).size() == 1;
+ }
+
private void assignOwnerFor(final DOMEntity entity) {
final Set<String> candidatesForEntity = currentCandidates.get(entity);
if (candidatesForEntity.isEmpty()) {
return member.getRoles().stream().filter(role -> role.startsWith(DATACENTER_PREFIX))
.findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
}
+
+ @Override
+ Logger getLogger() {
+ return LOG;
+ }
}
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
-import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import java.util.Map;
import java.util.Set;
import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidates;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesForMember;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterActivated;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest;
* Behavior that retrieves current candidates/owners from distributed-data and switches to OwnerSupervisor when the
* sync has finished.
*/
-public final class OwnerSyncer extends AbstractBehavior<OwnerSupervisorCommand> {
+public final class OwnerSyncer extends AbstractSupervisor {
private static final Logger LOG = LoggerFactory.getLogger(OwnerSyncer.class);
private final ReplicatorMessageAdapter<OwnerSupervisorCommand, LWWRegister<String>> ownerReplicator;
ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
- new ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>>(context, replicator,
- Duration.ofSeconds(5)).askGet(
+ candidateReplicator.askGet(
askReplyTo -> new Replicator.Get<>(CandidateRegistry.KEY, Replicator.readLocal(), askReplyTo),
InitialCandidateSync::new);
.onMessage(GetEntitiesBackendRequest.class, this::onFailEntityRpc)
.onMessage(GetEntityBackendRequest.class, this::onFailEntityRpc)
.onMessage(GetEntityOwnerBackendRequest.class, this::onFailEntityRpc)
+ .onMessage(ClearCandidatesForMember.class, this::onClearCandidatesForMember)
+ .onMessage(ClearCandidates.class, this::finishClearCandidates)
.build();
}
private static void handleNotFoundOwnerRsp(final Replicator.NotFound<LWWRegister<String>> rsp) {
LOG.debug("Owner not found. {}", rsp);
}
+
+ @Override
+ Logger getLogger() {
+ return LOG;
+ }
}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.eos.akka.registry.candidate.command;
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
import akka.cluster.ddata.ORMap;
import akka.cluster.ddata.ORSet;
import akka.cluster.ddata.typed.javadsl.Replicator;
import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
-public class InitialCandidateSync extends CandidateRegistryCommand {
+public class ClearCandidates extends OwnerSupervisorCommand {
private final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response;
+ private final ClearCandidatesForMember originalMessage;
- public InitialCandidateSync(final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response) {
+ public ClearCandidates(final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response,
+ final ClearCandidatesForMember originalMessage) {
this.response = response;
+ this.originalMessage = originalMessage;
}
public Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> getResponse() {
return response;
}
+
+ public ClearCandidatesForMember getOriginalMessage() {
+ return originalMessage;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import akka.actor.typed.ActorRef;
+import java.io.Serializable;
+
+/**
+ * Request sent from Candidate registration actors to clear the candidate from all entities. Issued at start to clear
+ * candidates from previous iteration of a node. Owner supervisor responds to this request to notify the registration
+ * actor it can start up and process candidate requests.
+ */
+public class ClearCandidatesForMember extends OwnerSupervisorCommand implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final ActorRef<ClearCandidatesResponse> replyTo;
+ private final String candidate;
+
+ public ClearCandidatesForMember(final ActorRef<ClearCandidatesResponse> replyTo, final String candidate) {
+ this.replyTo = replyTo;
+ this.candidate = candidate;
+ }
+
+ public ActorRef<ClearCandidatesResponse> getReplyTo() {
+ return replyTo;
+ }
+
+ public String getCandidate() {
+ return candidate;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import java.io.Serializable;
+
+/**
+ * Response sent from OwnerSupervisor to the ClearCandidatesForMember request, notifying the caller that removal has
+ * finished.
+ */
+public class ClearCandidatesResponse implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import akka.actor.typed.ActorRef;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.typed.javadsl.Replicator;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public class ClearCandidatesUpdateResponse extends OwnerSupervisorCommand {
+ private final Replicator.UpdateResponse<ORMap<DOMEntity, ORSet<String>>> response;
+ private final ActorRef<ClearCandidatesResponse> replyTo;
+
+ public ClearCandidatesUpdateResponse(final Replicator.UpdateResponse<ORMap<DOMEntity, ORSet<String>>> response,
+ final ActorRef<ClearCandidatesResponse> replyTo) {
+ this.response = response;
+ this.replyTo = replyTo;
+ }
+
+ public Replicator.UpdateResponse<ORMap<DOMEntity, ORSet<String>>> getResponse() {
+ return response;
+ }
+
+
+ public ActorRef<ClearCandidatesResponse> getReplyTo() {
+ return replyTo;
+ }
+}
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
+import akka.cluster.Cluster;
import akka.cluster.ddata.Key;
import akka.cluster.ddata.ORMap;
import akka.cluster.ddata.ORMapKey;
import akka.cluster.ddata.typed.javadsl.DistributedData;
import akka.cluster.ddata.typed.javadsl.Replicator;
import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import java.util.Set;
import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
import org.opendaylight.controller.eos.akka.registry.candidate.command.InternalUpdateResponse;
import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
private static final Logger LOG = LoggerFactory.getLogger(CandidateRegistry.class);
+ private static final String DATACENTER_PREFIX = "dc-";
+
public static final Key<ORMap<DOMEntity, ORSet<String>>> KEY = new ORMapKey<>("candidateRegistry");
private final ReplicatorMessageAdapter<CandidateRegistryCommand, ORMap<DOMEntity, ORSet<String>>> replicatorAdapter;
private final SelfUniqueAddress node;
+ private final String selfRole;
private CandidateRegistry(final ActorContext<CandidateRegistryCommand> context,
final ReplicatorMessageAdapter<CandidateRegistryCommand,
this.replicatorAdapter = replicatorAdapter;
this.node = DistributedData.get(context.getSystem()).selfUniqueAddress();
+ this.selfRole = extractRole(Cluster.get(context.getSystem()).selfMember().getRoles());
- LOG.debug("Candidate registry started");
+ LOG.debug("{} : Candidate registry started", selfRole);
}
public static Behavior<CandidateRegistryCommand> create() {
}
private Behavior<CandidateRegistryCommand> onRegisterCandidate(final RegisterCandidate registerCandidate) {
- LOG.debug("Registering candidate({}) for entity: {}",
+ LOG.debug("{} - Registering candidate({}) for entity: {}", selfRole,
registerCandidate.getCandidate(), registerCandidate.getEntity());
replicatorAdapter.askUpdate(
askReplyTo -> new Replicator.Update<>(
}
private Behavior<CandidateRegistryCommand> onUnregisterCandidate(final UnregisterCandidate unregisterCandidate) {
- LOG.debug("Removing candidate({}) from entity: {}",
+ LOG.debug("{} - Removing candidate({}) from entity: {}", selfRole,
unregisterCandidate.getCandidate(), unregisterCandidate.getEntity());
replicatorAdapter.askUpdate(
askReplyTo -> new Replicator.Update<>(
}
private Behavior<CandidateRegistryCommand> onInternalUpdateResponse(final InternalUpdateResponse updateResponse) {
- LOG.debug("Received update response: {}", updateResponse.getRsp());
+ LOG.debug("{} : Received update response: {}", selfRole, updateResponse.getRsp());
return this;
}
+
+ private static String extractRole(final Set<String> roles) {
+ return roles.stream().filter(role -> !role.contains(DATACENTER_PREFIX))
+ .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
+ }
}
*/
package org.opendaylight.controller.eos.akka.registry.candidate;
+import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import akka.actor.typed.javadsl.StashBuffer;
-import akka.cluster.ddata.ORMap;
-import akka.cluster.ddata.ORSet;
-import akka.cluster.ddata.SelfUniqueAddress;
-import akka.cluster.ddata.typed.javadsl.DistributedData;
-import akka.cluster.ddata.typed.javadsl.Replicator;
-import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
-import akka.cluster.typed.Cluster;
+import akka.cluster.Cluster;
import java.time.Duration;
-import java.util.Map;
import java.util.Set;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesForMember;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesResponse;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
-import org.opendaylight.controller.eos.akka.registry.candidate.command.InitialCandidateSync;
-import org.opendaylight.controller.eos.akka.registry.candidate.command.InternalUpdateResponse;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRemovalFailed;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRemovalFinished;
import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
+import org.opendaylight.controller.eos.akka.registry.candidate.command.RemovePreviousCandidates;
import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
-import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final String DATACENTER_PREFIX = "dc-";
private final StashBuffer<CandidateRegistryCommand> stash;
- private final ReplicatorMessageAdapter<CandidateRegistryCommand,
- ORMap<DOMEntity, ORSet<String>>> candidateReplicator;
+ private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
private final String selfRole;
- private final SelfUniqueAddress node;
public CandidateRegistryInit(final ActorContext<CandidateRegistryCommand> ctx,
final StashBuffer<CandidateRegistryCommand> stash,
- final ReplicatorMessageAdapter<CandidateRegistryCommand,
- ORMap<DOMEntity, ORSet<String>>> candidateReplicator) {
+ final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
super(ctx);
this.stash = stash;
- this.candidateReplicator = candidateReplicator;
- selfRole = extractRole(Cluster.get(ctx.getSystem()).selfMember().getRoles());
+ this.ownerSupervisor = ownerSupervisor;
+ this.selfRole = extractRole(Cluster.get(ctx.getSystem()).selfMember().getRoles());
- this.node = DistributedData.get(ctx.getSystem()).selfUniqueAddress();
+ ctx.getSelf().tell(new RemovePreviousCandidates());
-
- this.candidateReplicator.askGet(
- askReplyTo -> new Replicator.Get<>(
- CandidateRegistry.KEY,
- new Replicator.ReadAll(Duration.ofSeconds(15)), askReplyTo),
- InitialCandidateSync::new);
-
- LOG.debug("CandidateRegistry syncing behavior started.");
+ LOG.debug("{} : CandidateRegistry syncing behavior started.", selfRole);
}
- public static Behavior<CandidateRegistryCommand> create() {
+ public static Behavior<CandidateRegistryCommand> create(final ActorRef<OwnerSupervisorCommand> ownerSupervisor) {
return Behaviors.withStash(100,
stash ->
- Behaviors.setup(ctx -> DistributedData.withReplicatorMessageAdapter(
- (ReplicatorMessageAdapter<CandidateRegistryCommand,
- ORMap<DOMEntity, ORSet<String>>> replicatorAdapter) ->
- new CandidateRegistryInit(ctx, stash, replicatorAdapter))));
+ Behaviors.setup(ctx -> new CandidateRegistryInit(ctx, stash, ownerSupervisor)));
}
@Override
public Receive<CandidateRegistryCommand> createReceive() {
return newReceiveBuilder()
- .onMessage(InitialCandidateSync.class, this::handleCandidateSync)
+ .onMessage(RemovePreviousCandidates.class, this::onRemoveCandidates)
+ .onMessage(CandidateRemovalFinished.class, command -> switchToCandidateRegistry())
+ .onMessage(CandidateRemovalFailed.class, this::candidateRemovalFailed)
.onMessage(RegisterCandidate.class, this::stashCommand)
.onMessage(UnregisterCandidate.class, this::stashCommand)
.build();
}
- private Behavior<CandidateRegistryCommand> stashCommand(final CandidateRegistryCommand command) {
- stash.stash(command);
+ private Behavior<CandidateRegistryCommand> candidateRemovalFailed(final CandidateRemovalFailed command) {
+ LOG.warn("{} : Initial removal of candidates from previous iteration failed. Rescheduling.", selfRole,
+ command.getThrowable());
+ getContext().getSelf().tell(new RemovePreviousCandidates());
return this;
}
- private Behavior<CandidateRegistryCommand> handleCandidateSync(final InitialCandidateSync command) {
- final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response = command.getResponse();
- if (response instanceof Replicator.GetSuccess) {
- clearExistingCandidates((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response);
- }
- // TODO implement other cases if needed, seems like only a retry would be needed here when we get a failure
- // from distributed data
- return switchToCandidateRegistry();
- }
-
- private void clearExistingCandidates(final Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>> response) {
- final Map<DOMEntity, ORSet<String>> entitiesToCandidates = response.get(response.key()).getEntries();
+ private Behavior<CandidateRegistryCommand> onRemoveCandidates(final RemovePreviousCandidates command) {
+ LOG.debug("Sending RemovePreviousCandidates.");
+ getContext().ask(ClearCandidatesResponse.class,
+ ownerSupervisor, Duration.ofSeconds(5),
+ ref -> new ClearCandidatesForMember(ref, selfRole),
+ (response, throwable) -> {
+ if (response != null) {
+ return new CandidateRemovalFinished();
+ } else {
+ return new CandidateRemovalFailed(throwable);
+ }
+ });
- for (Map.Entry<DOMEntity, ORSet<String>> entry : entitiesToCandidates.entrySet()) {
- if (entry.getValue().getElements().contains(selfRole)) {
- LOG.debug("Clearing candidate: {} from entity: {}, current state of entity candidates: {}",
- selfRole, entry.getKey(), entry.getValue().getElements());
- clearRegistration(entry.getKey());
- }
- }
+ return this;
}
- private void clearRegistration(final DOMEntity entity) {
- candidateReplicator.askUpdate(
- askReplyTo -> new Replicator.Update<>(
- CandidateRegistry.KEY,
- ORMap.empty(),
- Replicator.writeLocal(),
- askReplyTo,
- map -> map.update(node, entity, ORSet.empty(),
- value -> value.remove(node, selfRole))),
- InternalUpdateResponse::new);
+ private Behavior<CandidateRegistryCommand> stashCommand(final CandidateRegistryCommand command) {
+ LOG.debug("Stashing {}", command);
+ stash.stash(command);
+ return this;
}
private Behavior<CandidateRegistryCommand> switchToCandidateRegistry() {
- LOG.debug("Clearing of candidates from previous instance done, switching to CandidateRegistry.");
+ LOG.debug("{} : Clearing of candidates from previous instance done, switching to CandidateRegistry.", selfRole);
return stash.unstashAll(CandidateRegistry.create());
}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.candidate.command;
+
+public class CandidateRemovalFailed extends CandidateRegistryCommand {
+
+ private final Throwable throwable;
+
+ public CandidateRemovalFailed(final Throwable throwable) {
+ this.throwable = throwable;
+ }
+
+ public Throwable getThrowable() {
+ return throwable;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.candidate.command;
+
+public class CandidateRemovalFinished extends CandidateRegistryCommand {
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.candidate.command;
+
+/**
+ * Message sent to candidate registry initial behavior by self to trigger and retrigger(in case of failures) removal
+ * of candidates registered by the previous iteration of this node.
+ */
+public class RemovePreviousCandidates extends CandidateRegistryCommand {
+}
import akka.actor.typed.javadsl.Behaviors;
import akka.cluster.ddata.LWWRegister;
import akka.cluster.ddata.LWWRegisterKey;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
import akka.cluster.ddata.typed.javadsl.DistributedData;
import akka.cluster.ddata.typed.javadsl.Replicator;
import com.typesafe.config.Config;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
+import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
}
protected static void waitUntillOwnerPresent(final ClusterNode clusterNode, final DOMEntity entity) {
- await().until(() -> {
+ await().atMost(Duration.ofSeconds(15)).until(() -> {
final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem());
final CompletionStage<Replicator.GetResponse<LWWRegister<String>>> ask =
AskPattern.ask(distributedData.replicator(),
});
}
+ protected static void waitUntillCandidatePresent(final ClusterNode clusterNode, final DOMEntity entity,
+ final String candidate) {
+ await().atMost(Duration.ofSeconds(15)).until(() -> {
+ final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem());
+
+ final CompletionStage<Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>>> ask =
+ AskPattern.ask(distributedData.replicator(),
+ replyTo -> new Replicator.Get<>(
+ CandidateRegistry.KEY, Replicator.readLocal(), replyTo),
+ Duration.ofSeconds(5),
+ clusterNode.getActorSystem().scheduler());
+
+ final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response =
+ ask.toCompletableFuture().get(5, TimeUnit.SECONDS);
+
+ if (response instanceof Replicator.GetSuccess) {
+ final Map<DOMEntity, ORSet<String>> entries =
+ ((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response).dataValue().getEntries();
+
+ return entries.get(entity).contains(candidate);
+
+ }
+ return false;
+ });
+ }
+
protected static CompletableFuture<OwnerSupervisorReply> activateDatacenter(final ClusterNode clusterNode) {
final CompletionStage<OwnerSupervisorReply> ask =
AskPattern.ask(clusterNode.getOwnerSupervisor(),
final boolean hasOwner, final boolean isOwner, final boolean wasOwner) {
await().until(() -> !listener.getChanges().isEmpty());
- await().untilAsserted(() -> {
+ await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
final List<DOMEntityOwnershipChange> changes = listener.getChanges();
final DOMEntityOwnershipChange domEntityOwnershipChange = listener.getChanges().get(changes.size() - 1);
assertEquals(entity, domEntityOwnershipChange.getEntity());
public void testDatacenterActivation() throws Exception {
registerCandidates(node1, ENTITY_1, "member-1");
registerCandidates(node3, ENTITY_1, "member-3");
- registerCandidates(node4, ENTITY_1, "member-4");
activateDatacenter(node1).get();
verifyListenerState(listener1, ENTITY_1, true, false, false);
verifyListenerState(listener2, ENTITY_1, true, true, false);
+ registerCandidates(node4, ENTITY_1, "member-4");
unregisterCandidates(node3, ENTITY_1, "member-3");
// checking index after notif so current + 1
verifyListenerState(listener1, ENTITY_1, true, false, false);
- verifyListenerState(listener2, ENTITY_1, true, false, true);
+ verifyListenerState(listener2, ENTITY_1, true, false, false);
deactivateDatacenter(node3).get();
activateDatacenter(node2).get();
-
- // no candidate in dc-primary so no owners after datacenter activation
- verifyListenerState(listener1, ENTITY_1, false, false, false);
- verifyListenerState(listener2, ENTITY_1, false, false, false);
}
@Test
registerCandidates(node3, ENTITY_1, "member-3");
registerCandidates(node4, ENTITY_1, "member-4");
+ waitUntillCandidatePresent(node1, ENTITY_1, "member-1");
+ waitUntillCandidatePresent(node1, ENTITY_1, "member-3");
+ waitUntillCandidatePresent(node1, ENTITY_1, "member-4");
+
activateDatacenter(node1).get();
- waitUntillOwnerPresent(node1, ENTITY_1);
+ waitUntillOwnerPresent(node4, ENTITY_1);
final MockEntityOwnershipListener listener1 = registerListener(node1, ENTITY_1);
verifyListenerState(listener1, ENTITY_1, true, true, false);
activateDatacenter(node3).get();
verifyListenerState(listener2, ENTITY_1, true, true, false);
+ waitUntillOwnerPresent(node3, ENTITY_1);
unregisterCandidates(node3, ENTITY_1, "member-3");
verifyListenerState(listener2, ENTITY_1, true, false, true);
}
verifyListenerState(node1Listener, ENTITY_1, true, false, false);
}
+ @Test
+ public void testOwnerNotReassignedWhenOnlyCandidate() throws Exception {
+ startNode3();
+ final MockEntityOwnershipListener listener1 = registerListener(node1, ENTITY_1);
+ final MockEntityOwnershipListener listener2 = registerListener(node2, ENTITY_1);
+ verifyNoNotifications(listener1);
+ verifyNoNotifications(listener2);
+
+ registerCandidates(node3, ENTITY_1, "member-3");
+ waitUntillOwnerPresent(node1, ENTITY_1);
+
+ MockEntityOwnershipListener listener3 = registerListener(node3, ENTITY_1);
+ verifyListenerState(listener1, ENTITY_1, true, false, false);
+ verifyListenerState(listener3, ENTITY_1, true, true, false);
+
+ ActorTestKit.shutdown(node3.getActorSystem(), Duration.ofSeconds(20));
+
+ verifyListenerState(listener1, ENTITY_1, true, false, false);
+ verifyListenerState(listener2, ENTITY_1, true, false, false);
+
+ startNode3();
+ verifyListenerState(listener1, ENTITY_1, false, false, false);
+
+ listener3 = registerListener(node3, ENTITY_1);
+ verifyListenerState(listener3, ENTITY_1, false, false, false);
+
+ registerCandidates(node1, ENTITY_1, "member-1");
+
+ verifyListenerState(listener1, ENTITY_1, true, true, false);
+ verifyListenerState(listener3, ENTITY_1, true, false, false);
+
+ }
+
private void startNode3() throws Exception {
startNode3(3);
}
// need to wait until all nodes are ready
final Cluster cluster = Cluster.get(node2.getActorSystem());
- await().atMost(Duration.ofSeconds(20)).until(() -> {
+ await().atMost(Duration.ofSeconds(30)).until(() -> {
final List<Member> members = ImmutableList.copyOf(cluster.state().getMembers());
if (members.size() != membersPresent) {
return false;
# This value controls how quickly Entity Ownership Service decisions are
# propagated within a node.
notify-subscribers-interval = 20 ms
- }
+ }
+ split-brain-resolver {
+ active-strategy = keep-majority
+ stable-after = 7s
+ }
}
}