import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
import org.opendaylight.controller.eos.akka.bootstrap.command.Terminate;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntitiesRequest;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerReply;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerRequest;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityReply;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityRequest;
import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState;
import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply;
import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
+import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerReply;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesRequest;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerReply;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerRequest;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityReply;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityRequest;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
AskPattern.ask(ownerSupervisor, DeactivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler));
}
-
@Override
public ListenableFuture<RpcResult<GetEntitiesOutput>> getEntities(final GetEntitiesInput input) {
- return toRpcFuture(AskPattern.ask(ownerSupervisor, GetEntitiesRequest::new, QUERY_TIMEOUT, scheduler),
+ return toRpcFuture(AskPattern.ask(ownerStateChecker, GetEntitiesRequest::new, QUERY_TIMEOUT, scheduler),
reply -> reply.toOutput(iidCodec));
}
@Override
public ListenableFuture<RpcResult<GetEntityOutput>> getEntity(final GetEntityInput input) {
- return toRpcFuture(AskPattern.ask(ownerSupervisor,
+ return toRpcFuture(AskPattern.ask(ownerStateChecker,
(final ActorRef<GetEntityReply> replyTo) -> new GetEntityRequest(replyTo, input), QUERY_TIMEOUT, scheduler),
GetEntityReply::toOutput);
}
@Override
public ListenableFuture<RpcResult<GetEntityOwnerOutput>> getEntityOwner(final GetEntityOwnerInput input) {
- return toRpcFuture(AskPattern.ask(ownerSupervisor,
+ return toRpcFuture(AskPattern.ask(ownerStateChecker,
(final ActorRef<GetEntityOwnerReply> replyTo) -> new GetEntityOwnerRequest(replyTo, input), QUERY_TIMEOUT,
scheduler), GetEntityOwnerReply::toOutput);
}
return runningContext;
}
- private static <R extends OwnerSupervisorReply, O extends RpcOutput> ListenableFuture<RpcResult<O>> toRpcFuture(
+ private static <R extends StateCheckerReply, O extends RpcOutput> ListenableFuture<RpcResult<O>> toRpcFuture(
final CompletionStage<R> stage, final Function<R, O> outputFunction) {
final SettableFuture<RpcResult<O>> future = SettableFuture.create();
listenerRegistry = context.spawn(EntityTypeListenerRegistry.create(role), "ListenerRegistry");
candidateRegistry = context.spawn(CandidateRegistryInit.create(), "CandidateRegistry");
- ownerStateChecker = context.spawn(OwnerStateChecker.create(role), "OwnerStateChecker");
final ClusterSingleton clusterSingleton = ClusterSingleton.get(context.getSystem());
// start the initial sync behavior that switches to the regular one after syncing
ownerSupervisor = clusterSingleton.init(
SingletonActor.of(IdleSupervisor.create(iidCodec), "OwnerSupervisor"));
+
+ ownerStateChecker = context.spawn(OwnerStateChecker.create(role, ownerSupervisor, iidCodec),
+ "OwnerStateChecker");
}
public static Behavior<BootstrapCommand> create(final BindingInstanceIdentifierCodec iidCodec) {
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker;
+
+import static com.google.common.base.Verify.verifyNotNull;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.AskPattern;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.LWWRegisterKey;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator;
+import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletionStage;
+import org.opendaylight.controller.eos.akka.owner.checker.command.AbstractEntityRequest;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetCandidates;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetCandidatesForEntity;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntitiesReply;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntitiesRequest;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerReply;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerRequest;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityReply;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityRequest;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnerForEntity;
+import org.opendaylight.controller.eos.akka.owner.checker.command.OwnerDataResponse;
+import org.opendaylight.controller.eos.akka.owner.checker.command.SingleEntityOwnerDataResponse;
+import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
+import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Short-lived actor that is spawned purely for execution of rpcs from the entity-owners model.
+ */
+public final class EntityRpcHandler extends AbstractBehavior<StateCheckerCommand> {
+ private static final Logger LOG = LoggerFactory.getLogger(EntityRpcHandler.class);
+ private static final Duration ASK_TIMEOUT = Duration.ofSeconds(5);
+
+ private final ReplicatorMessageAdapter<StateCheckerCommand, LWWRegister<String>> ownerReplicator;
+ private final ReplicatorMessageAdapter<StateCheckerCommand, ORMap<DOMEntity, ORSet<String>>> candidateReplicator;
+
+ private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
+ private final ActorRef<Replicator.Command> replicator;
+
+ private final BindingInstanceIdentifierCodec iidCodec;
+
+ private final Map<DOMEntity, Set<String>> currentCandidates = new HashMap<>();
+ private final Map<DOMEntity, String> currentOwners = new HashMap<>();
+ private final Map<String, DOMEntity> entityLookup = new HashMap<>();
+ private int toSync = -1;
+
+ public EntityRpcHandler(final ActorContext<StateCheckerCommand> context,
+ final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
+ final BindingInstanceIdentifierCodec iidCodec) {
+ super(context);
+
+ replicator = DistributedData.get(context.getSystem()).replicator();
+ ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, ASK_TIMEOUT);
+ candidateReplicator = new ReplicatorMessageAdapter<>(getContext(), replicator, ASK_TIMEOUT);
+ this.ownerSupervisor = ownerSupervisor;
+
+ this.iidCodec = iidCodec;
+ }
+
+ public static Behavior<StateCheckerCommand> create(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
+ final BindingInstanceIdentifierCodec iidCodec) {
+ return Behaviors.setup(ctx -> new EntityRpcHandler(ctx, ownerSupervisor, iidCodec));
+ }
+
+ @Override
+ public Receive<StateCheckerCommand> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(GetEntitiesRequest.class, this::onGetEntities)
+ .onMessage(GetEntityRequest.class, this::onGetEntity)
+ .onMessage(GetEntityOwnerRequest.class, this::onGetEntityOwner)
+ .onMessage(GetCandidates.class, this::onCandidatesReceived)
+ .onMessage(GetCandidatesForEntity.class, this::onCandidatesForEntityReceived)
+ .onMessage(OwnerDataResponse.class, this::onOwnerDataReceived)
+ .onMessage(SingleEntityOwnerDataResponse.class, this::onSingleOwnerReceived)
+ .onMessage(GetOwnerForEntity.class, this::onReplyWithOwner)
+ .build();
+ }
+
+ private Behavior<StateCheckerCommand> onGetEntities(final GetEntitiesRequest request) {
+ LOG.debug("{} : Executing get-entities rpc.", getContext().getSelf());
+ final CompletionStage<GetEntitiesBackendReply> result = AskPattern.askWithStatus(
+ ownerSupervisor,
+ GetEntitiesBackendRequest::new,
+ ASK_TIMEOUT,
+ getContext().getSystem().scheduler()
+ );
+
+ result.whenComplete((response, throwable) -> {
+ if (response != null) {
+ request.getReplyTo().tell(new GetEntitiesReply(response));
+ } else {
+ // retry backed with distributed-data
+ LOG.debug("{} : Get-entities failed with owner supervisor, falling back to distributed-data.",
+ getContext().getSelf(), throwable);
+ getCandidates(request.getReplyTo());
+ }
+ });
+ return this;
+ }
+
+ private Behavior<StateCheckerCommand> onGetEntity(final GetEntityRequest request) {
+ LOG.debug("{} : Executing get-entity rpc.", getContext().getSelf());
+ final CompletionStage<GetEntityBackendReply> result = AskPattern.askWithStatus(
+ ownerSupervisor,
+ replyTo -> new GetEntityBackendRequest(replyTo, request.getEntity()),
+ ASK_TIMEOUT,
+ getContext().getSystem().scheduler()
+ );
+
+ result.whenComplete((response, throwable) -> {
+ if (response != null) {
+ request.getReplyTo().tell(new GetEntityReply(response));
+ } else {
+ // retry backed with distributed-data
+ LOG.debug("{} : Get-entity failed with owner supervisor, falling back to distributed-data.",
+ getContext().getSelf(), throwable);
+ getCandidatesForEntity(extractEntity(request), request.getReplyTo());
+ }
+ });
+ return this;
+ }
+
+ private Behavior<StateCheckerCommand> onGetEntityOwner(final GetEntityOwnerRequest request) {
+ LOG.debug("{} : Executing get-entity-owner rpc.", getContext().getSelf());
+ final CompletionStage<GetEntityOwnerBackendReply> result = AskPattern.askWithStatus(
+ ownerSupervisor,
+ replyTo -> new GetEntityOwnerBackendRequest(replyTo, request.getEntity()),
+ ASK_TIMEOUT,
+ getContext().getSystem().scheduler()
+ );
+
+ result.whenComplete((response, throwable) -> {
+ if (response != null) {
+ request.getReplyTo().tell(new GetEntityOwnerReply(response.getOwner()));
+ } else {
+ // retry backed with distributed-data
+ LOG.debug("{} : Get-entity-owner failed with owner supervisor, falling back to distributed-data.",
+ getContext().getSelf(), throwable);
+ getOwnerForEntity(extractEntity(request), request.getReplyTo());
+ }
+ });
+ return this;
+ }
+
+ private void getCandidates(final ActorRef<GetEntitiesReply> replyTo) {
+ candidateReplicator.askGet(
+ askReplyTo -> new Replicator.Get<>(CandidateRegistry.KEY, Replicator.readLocal(), askReplyTo),
+ replicatorResponse -> new GetCandidates(replicatorResponse, replyTo));
+ }
+
+ private void getCandidatesForEntity(final DOMEntity entity, final ActorRef<GetEntityReply> replyTo) {
+ candidateReplicator.askGet(
+ askReplyTo -> new Replicator.Get<>(CandidateRegistry.KEY, Replicator.readLocal(), askReplyTo),
+ replicatorResponse -> new GetCandidatesForEntity(replicatorResponse, entity, replyTo));
+ }
+
+ private void getOwnerForEntity(final DOMEntity entity, final ActorRef<GetEntityOwnerReply> replyTo) {
+ ownerReplicator.askGet(
+ askReplyTo -> new Replicator.Get<>(
+ new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), askReplyTo),
+ replicatorReponse -> new GetOwnerForEntity(replicatorReponse, entity, replyTo));
+ }
+
+ private Behavior<StateCheckerCommand> onReplyWithOwner(final GetOwnerForEntity message) {
+ final Replicator.GetResponse<LWWRegister<String>> response = message.getResponse();
+ if (response instanceof Replicator.GetSuccess) {
+ message.getReplyTo().tell(new GetEntityOwnerReply(
+ ((Replicator.GetSuccess<LWWRegister<String>>) response).dataValue().getValue()));
+ } else {
+ LOG.debug("Unable to retrieve owner for entity: {}, response: {}", message.getEntity(), response);
+ message.getReplyTo().tell(new GetEntityOwnerReply(""));
+ }
+
+ return Behaviors.stopped();
+ }
+
+ private Behavior<StateCheckerCommand> onCandidatesReceived(final GetCandidates message) {
+ final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response = message.getResponse();
+ if (response instanceof Replicator.GetSuccess) {
+ return extractCandidates((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response,
+ message.getReplyTo());
+ }
+
+ LOG.debug("Unable to retrieve candidates from distributed-data. Response: {}", response);
+ message.getReplyTo().tell(new GetEntitiesReply(Collections.emptyMap(), Collections.emptyMap()));
+ return Behaviors.stopped();
+ }
+
+ private Behavior<StateCheckerCommand> extractCandidates(
+ final Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>> response,
+ final ActorRef<GetEntitiesReply> replyTo) {
+ final ORMap<DOMEntity, ORSet<String>> candidates = response.get(CandidateRegistry.KEY);
+ candidates.getEntries().forEach((key, value) -> currentCandidates.put(key, new HashSet<>(value.getElements())));
+
+ toSync = candidates.keys().size();
+ for (final DOMEntity entity : candidates.keys().getElements()) {
+ entityLookup.put(entity.toString(), entity);
+
+ ownerReplicator.askGet(
+ askReplyTo -> new Replicator.Get<>(
+ new LWWRegisterKey<>(entity.toString()),
+ Replicator.readLocal(),
+ askReplyTo),
+ replicatorResponse -> new OwnerDataResponse(replicatorResponse, replyTo));
+ }
+
+ return this;
+ }
+
+ private Behavior<StateCheckerCommand> onOwnerDataReceived(final OwnerDataResponse message) {
+ final Replicator.GetResponse<LWWRegister<String>> response = message.getResponse();
+ if (response instanceof Replicator.GetSuccess) {
+ handleOwnerRsp((Replicator.GetSuccess<LWWRegister<String>>) response);
+ } else if (response instanceof Replicator.NotFound) {
+ handleNotFoundOwnerRsp((Replicator.NotFound<LWWRegister<String>>) response);
+ } else {
+ LOG.debug("Owner retrieval failed, response: {}", response);
+ }
+
+ // count the responses, on last respond to rpc and shutdown
+ toSync--;
+ if (toSync == 0) {
+ final GetEntitiesReply getEntitiesReply = new GetEntitiesReply(currentCandidates, currentOwners);
+ message.getReplyTo().tell(getEntitiesReply);
+ return Behaviors.stopped();
+ }
+
+ return this;
+ }
+
+ private Behavior<StateCheckerCommand> onCandidatesForEntityReceived(final GetCandidatesForEntity message) {
+ LOG.debug("Received CandidatesForEntity: {}", message);
+ final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response = message.getResponse();
+ if (response instanceof Replicator.GetSuccess) {
+ return extractCandidatesForEntity((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response,
+ message.getEntity(), message.getReplyTo());
+ } else {
+ LOG.debug("Unable to retrieve candidates for entity: {}. Response:: {}", message.getEntity(), response);
+ message.getReplyTo().tell(new GetEntityReply(null, Collections.emptySet()));
+ return this;
+ }
+ }
+
+ private Behavior<StateCheckerCommand> extractCandidatesForEntity(
+ final Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>> response, final DOMEntity entity,
+ final ActorRef<GetEntityReply> replyTo) {
+ final Map<DOMEntity, ORSet<String>> entries = response.get(CandidateRegistry.KEY).getEntries();
+ currentCandidates.put(entity, entries.get(entity).getElements());
+
+ entityLookup.put(entity.toString(), entity);
+ ownerReplicator.askGet(
+ askReplyTo -> new Replicator.Get<>(
+ new LWWRegisterKey<>(entity.toString()),
+ Replicator.readLocal(),
+ askReplyTo),
+ replicatorResponse -> new SingleEntityOwnerDataResponse(replicatorResponse, entity, replyTo));
+
+ return this;
+ }
+
+ private void handleOwnerRsp(final Replicator.GetSuccess<LWWRegister<String>> rsp) {
+ final DOMEntity entity = entityLookup.get(rsp.key().id());
+ final String owner = rsp.get(rsp.key()).getValue();
+
+ currentOwners.put(entity, owner);
+ }
+
+ private static void handleNotFoundOwnerRsp(final Replicator.NotFound<LWWRegister<String>> rsp) {
+ LOG.debug("Owner not found. {}", rsp);
+ }
+
+ private Behavior<StateCheckerCommand> onSingleOwnerReceived(final SingleEntityOwnerDataResponse message) {
+ LOG.debug("Received owner for single entity: {}", message);
+ final Replicator.GetResponse<LWWRegister<String>> response = message.getResponse();
+ final GetEntityReply reply;
+ if (response instanceof Replicator.GetSuccess) {
+ reply = new GetEntityReply(((Replicator.GetSuccess<LWWRegister<String>>) response).dataValue().getValue(),
+ currentCandidates.get(message.getEntity()));
+ } else {
+ reply = new GetEntityReply(null, currentCandidates.get(message.getEntity()));
+ }
+
+ message.getReplyTo().tell(reply);
+ return Behaviors.stopped();
+ }
+
+ private DOMEntity extractEntity(final AbstractEntityRequest<?> request) {
+ final var name = request.getName();
+ final var iid = name.getInstanceIdentifier();
+ if (iid != null) {
+ return new DOMEntity(request.getType().getValue(), iidCodec.fromBinding(iid));
+ }
+ final var str = verifyNotNull(name.getString(), "Unhandled entity name %s", name);
+ return new DOMEntity(request.getType().getValue(), str);
+ }
+}
import static java.util.Objects.requireNonNull;
+import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.cluster.ddata.LWWRegister;
import akka.cluster.ddata.LWWRegisterKey;
import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator;
import akka.cluster.ddata.typed.javadsl.Replicator.Get;
import akka.cluster.ddata.typed.javadsl.Replicator.GetFailure;
import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse;
import akka.cluster.ddata.typed.javadsl.Replicator.ReadMajority;
import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
import java.time.Duration;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntitiesRequest;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerRequest;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityRequest;
import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState;
import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply;
import org.opendaylight.controller.eos.akka.owner.checker.command.InternalGetReply;
import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
+import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Duration GET_OWNERSHIP_TIMEOUT = Duration.ofSeconds(5);
private static final Duration UNEXPECTED_ASK_TIMEOUT = Duration.ofSeconds(5);
- private final ReplicatorMessageAdapter<StateCheckerCommand, LWWRegister<String>> replicatorAdapter;
+ private final ReplicatorMessageAdapter<StateCheckerCommand, LWWRegister<String>> ownerReplicator;
+ private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
+ private final BindingInstanceIdentifierCodec iidCodec;
+ private final ActorRef<Replicator.Command> replicator;
private final String localMember;
- private OwnerStateChecker(final ActorContext<StateCheckerCommand> context, final String localMember) {
+ private OwnerStateChecker(final ActorContext<StateCheckerCommand> context,
+ final String localMember,
+ final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
+ final BindingInstanceIdentifierCodec iidCodec) {
super(context);
this.localMember = requireNonNull(localMember);
- replicatorAdapter = new ReplicatorMessageAdapter<>(context,
- DistributedData.get(context.getSystem()).replicator(), UNEXPECTED_ASK_TIMEOUT);
+ this.ownerSupervisor = requireNonNull(ownerSupervisor);
+ this.iidCodec = requireNonNull(iidCodec);
+ replicator = DistributedData.get(context.getSystem()).replicator();
+ ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, UNEXPECTED_ASK_TIMEOUT);
}
- public static Behavior<StateCheckerCommand> create(final String localMember) {
- return Behaviors.setup(ctx -> new OwnerStateChecker(ctx, localMember));
+ public static Behavior<StateCheckerCommand> create(final String localMember,
+ final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
+ final BindingInstanceIdentifierCodec iidCodec) {
+ return Behaviors.setup(ctx -> new OwnerStateChecker(ctx, localMember, ownerSupervisor, iidCodec));
}
@Override
return newReceiveBuilder()
.onMessage(GetOwnershipState.class, this::onGetOwnershipState)
.onMessage(InternalGetReply.class, this::respondWithState)
+ .onMessage(GetEntitiesRequest.class, this::executeEntityRpc)
+ .onMessage(GetEntityRequest.class, this::executeEntityRpc)
+ .onMessage(GetEntityOwnerRequest.class, this::executeEntityRpc)
.build();
}
private Behavior<StateCheckerCommand> onGetOwnershipState(final GetOwnershipState message) {
- replicatorAdapter.askGet(
+ ownerReplicator.askGet(
askReplyTo -> new Get<>(
new LWWRegisterKey<>(message.getEntity().toString()),
new ReadMajority(GET_OWNERSHIP_TIMEOUT),
}
return this;
}
+
+ private Behavior<StateCheckerCommand> executeEntityRpc(final StateCheckerRequest request) {
+ final ActorRef<StateCheckerCommand> rpcHandler =
+ getContext().spawnAnonymous(EntityRpcHandler.create(ownerSupervisor, iidCodec));
+
+ LOG.debug("Executing entity rpc: {} in actor: {}", request, rpcHandler);
+ rpcHandler.tell(request);
+ return this;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker.command;
+
+import akka.actor.typed.ActorRef;
+import com.google.common.base.MoreObjects;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityName;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.get.entities.output.EntitiesBuilder;
+
+public abstract class AbstractEntityRequest<T extends StateCheckerReply> extends StateCheckerRequest<T> {
+ private static final long serialVersionUID = 1L;
+
+ private final @NonNull EntityType type;
+ private final @NonNull EntityName name;
+
+ AbstractEntityRequest(final ActorRef<T> replyTo, final EntityId entity) {
+ super(replyTo);
+ type = entity.requireType();
+ name = entity.requireName();
+ }
+
+ public final @NonNull EntityId getEntity() {
+ return new EntitiesBuilder().setType(type).setName(name).build();
+ }
+
+ public final @NonNull EntityType getType() {
+ return type;
+ }
+
+ public final @NonNull EntityName getName() {
+ return name;
+ }
+
+ @Override
+ public final String toString() {
+ return MoreObjects.toStringHelper(this).add("type", type).add("name", name).toString();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.typed.ActorRef;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public final class GetCandidates extends StateCheckerCommand {
+ private final @Nullable GetResponse<ORMap<DOMEntity, ORSet<String>>> response;
+ private final @NonNull ActorRef<GetEntitiesReply> replyTo;
+
+ public GetCandidates(final GetResponse<ORMap<DOMEntity, ORSet<String>>> response,
+ final ActorRef<GetEntitiesReply> replyTo) {
+ this.response = response;
+ this.replyTo = requireNonNull(replyTo);
+ }
+
+ public @Nullable GetResponse<ORMap<DOMEntity, ORSet<String>>> getResponse() {
+ return response;
+ }
+
+ public @NonNull ActorRef<GetEntitiesReply> getReplyTo() {
+ return replyTo;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.typed.ActorRef;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public final class GetCandidatesForEntity extends StateCheckerCommand {
+ private final @Nullable GetResponse<ORMap<DOMEntity, ORSet<String>>> response;
+ private final @NonNull DOMEntity entity;
+ private final @NonNull ActorRef<GetEntityReply> replyTo;
+
+ public GetCandidatesForEntity(final GetResponse<ORMap<DOMEntity, ORSet<String>>> response,
+ final DOMEntity entity, final ActorRef<GetEntityReply> replyTo) {
+ this.response = response;
+ this.entity = requireNonNull(entity);
+ this.replyTo = requireNonNull(replyTo);
+ }
+
+ public @Nullable GetResponse<ORMap<DOMEntity, ORSet<String>>> getResponse() {
+ return response;
+ }
+
+ public @NonNull DOMEntity getEntity() {
+ return entity;
+ }
+
+ public @NonNull ActorRef<GetEntityReply> getReplyTo() {
+ return replyTo;
+ }
+}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+package org.opendaylight.controller.eos.akka.owner.checker.command;
import static com.google.common.base.Verify.verify;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSetMultimap;
-import com.google.common.collect.ImmutableSetMultimap.Builder;
import com.google.common.collect.Iterables;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendReply;
import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
-public final class GetEntitiesReply extends OwnerSupervisorReply implements Serializable {
+public final class GetEntitiesReply extends StateCheckerReply implements Serializable {
private static final long serialVersionUID = 1L;
private final ImmutableSetMultimap<DOMEntity, String> candidates;
private final ImmutableMap<DOMEntity, String> owners;
- public GetEntitiesReply(final Map<DOMEntity, String> owners, final Map<DOMEntity, Set<String>> candidates) {
- final Builder<DOMEntity, String> builder = ImmutableSetMultimap.builder();
- for (Entry<DOMEntity, Set<String>> entry : candidates.entrySet()) {
+ public GetEntitiesReply(final GetEntitiesBackendReply response) {
+ this.owners = response.getOwners();
+ this.candidates = response.getCandidates();
+ }
+
+ public GetEntitiesReply(final Map<DOMEntity, Set<String>> candidates, final Map<DOMEntity, String> owners) {
+ final ImmutableSetMultimap.Builder<DOMEntity, String> builder = ImmutableSetMultimap.builder();
+ for (Map.Entry<DOMEntity, Set<String>> entry : candidates.entrySet()) {
builder.putAll(entry.getKey(), entry.getValue());
}
this.candidates = builder.build();
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker.command;
+
+import akka.actor.typed.ActorRef;
+
+public final class GetEntitiesRequest extends StateCheckerRequest<GetEntitiesReply> {
+ private static final long serialVersionUID = 1L;
+
+ public GetEntitiesRequest(final ActorRef<GetEntitiesReply> replyTo) {
+ super(replyTo);
+ }
+
+ @Override
+ public String toString() {
+ return "GetEntitiesRequest{} " + super.toString();
+ }
+}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+package org.opendaylight.controller.eos.akka.owner.checker.command;
import java.io.Serializable;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.NodeName;
-public final class GetEntityOwnerReply extends OwnerSupervisorReply implements Serializable {
+public final class GetEntityOwnerReply extends StateCheckerReply implements Serializable {
private static final long serialVersionUID = 1L;
private final String owner;
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+package org.opendaylight.controller.eos.akka.owner.checker.command;
import akka.actor.typed.ActorRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityId;
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+package org.opendaylight.controller.eos.akka.owner.checker.command;
import com.google.common.collect.ImmutableSet;
import java.io.Serializable;
import java.util.stream.Collectors;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendReply;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.NodeName;
-public final class GetEntityReply extends OwnerSupervisorReply implements Serializable {
+public final class GetEntityReply extends StateCheckerReply implements Serializable {
private static final long serialVersionUID = 1L;
private final ImmutableSet<String> candidates;
private final String owner;
+ public GetEntityReply(final GetEntityBackendReply backendReply) {
+ candidates = backendReply.getCandidates();
+ owner = backendReply.getOwner();
+ }
+
public GetEntityReply(final @Nullable String owner, final @Nullable Set<String> candidates) {
this.owner = owner;
this.candidates = candidates == null ? ImmutableSet.of() : ImmutableSet.copyOf(candidates);
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+package org.opendaylight.controller.eos.akka.owner.checker.command;
import akka.actor.typed.ActorRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityId;
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker.command;
+
+import akka.actor.typed.ActorRef;
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public class GetOwnerForEntity extends StateCheckerCommand {
+ private final @NonNull GetResponse<LWWRegister<String>> response;
+ private final DOMEntity entity;
+ private final ActorRef<GetEntityOwnerReply> replyTo;
+
+ public GetOwnerForEntity(final @NonNull GetResponse<LWWRegister<String>> response,
+ final DOMEntity entity, final ActorRef<GetEntityOwnerReply> replyTo) {
+ this.response = response;
+ this.entity = entity;
+ this.replyTo = replyTo;
+ }
+
+ public GetResponse<LWWRegister<String>> getResponse() {
+ return response;
+ }
+
+ public DOMEntity getEntity() {
+ return entity;
+ }
+
+ public ActorRef<GetEntityOwnerReply> getReplyTo() {
+ return replyTo;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.typed.ActorRef;
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse;
+import org.eclipse.jdt.annotation.NonNull;
+
+public class OwnerDataResponse extends StateCheckerCommand {
+ private final @NonNull GetResponse<LWWRegister<String>> response;
+ private final ActorRef<GetEntitiesReply> replyTo;
+
+ public OwnerDataResponse(final GetResponse<LWWRegister<String>> response,
+ final ActorRef<GetEntitiesReply> replyTo) {
+ this.response = requireNonNull(response);
+ this.replyTo = replyTo;
+ }
+
+ public @NonNull GetResponse<LWWRegister<String>> getResponse() {
+ return response;
+ }
+
+ public ActorRef<GetEntitiesReply> getReplyTo() {
+ return replyTo;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.typed.ActorRef;
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public class SingleEntityOwnerDataResponse extends StateCheckerCommand {
+ private final @NonNull GetResponse<LWWRegister<String>> response;
+ private final DOMEntity entity;
+ private final ActorRef<GetEntityReply> replyTo;
+
+ public SingleEntityOwnerDataResponse(final @NonNull GetResponse<LWWRegister<String>> response,
+ final DOMEntity entity,
+ final ActorRef<GetEntityReply> replyTo) {
+ this.response = requireNonNull(response);
+ this.entity = requireNonNull(entity);
+ this.replyTo = requireNonNull(replyTo);
+ }
+
+ public @NonNull GetResponse<LWWRegister<String>> getResponse() {
+ return response;
+ }
+
+ public DOMEntity getEntity() {
+ return entity;
+ }
+
+ public ActorRef<GetEntityReply> getReplyTo() {
+ return replyTo;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.typed.ActorRef;
+import java.io.Serializable;
+import org.eclipse.jdt.annotation.NonNull;
+
+public abstract class StateCheckerRequest<T extends StateCheckerReply> extends StateCheckerCommand
+ implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final @NonNull ActorRef<T> replyTo;
+
+ StateCheckerRequest(final ActorRef<T> replyTo) {
+ this.replyTo = requireNonNull(replyTo);
+ }
+
+ public final @NonNull ActorRef<T> getReplyTo() {
+ return replyTo;
+ }
+}
import akka.actor.typed.javadsl.Receive;
import akka.cluster.Member;
import akka.cluster.typed.Cluster;
+import akka.pattern.StatusReply;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorRequest;
import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public Receive<OwnerSupervisorCommand> createReceive() {
return newReceiveBuilder()
.onMessage(ActivateDataCenter.class, this::onActivateDataCenter)
+ .onMessage(GetEntitiesBackendRequest.class, this::onFailEntityRpc)
+ .onMessage(GetEntityBackendRequest.class, this::onFailEntityRpc)
+ .onMessage(GetEntityOwnerBackendRequest.class, this::onFailEntityRpc)
.build();
}
+ private Behavior<OwnerSupervisorCommand> onFailEntityRpc(final OwnerSupervisorRequest message) {
+ LOG.debug("Failing rpc request. {}", message);
+ message.getReplyTo().tell(StatusReply.error("OwnerSupervisor is inactive so it"
+ + " cannot handle entity rpc requests."));
+ return this;
+ }
+
private Behavior<OwnerSupervisorCommand> onActivateDataCenter(final ActivateDataCenter message) {
LOG.debug("Received ActivateDataCenter command switching to syncer behavior,");
return OwnerSyncer.create(message.getReplyTo(), iidCodec);
import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
import akka.cluster.typed.Cluster;
import akka.cluster.typed.Subscribe;
+import akka.pattern.StatusReply;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesReply;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesRequest;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerReply;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerRequest;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityReply;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
.onMessage(MemberDownEvent.class, this::onPeerDown)
.onMessage(MemberReachableEvent.class, this::onPeerReachable)
.onMessage(MemberUnreachableEvent.class, this::onPeerUnreachable)
- .onMessage(GetEntitiesRequest.class, this::onGetEntities)
- .onMessage(GetEntityRequest.class, this::onGetEntity)
- .onMessage(GetEntityOwnerRequest.class, this::onGetEntityOwner)
+ .onMessage(GetEntitiesBackendRequest.class, this::onGetEntities)
+ .onMessage(GetEntityBackendRequest.class, this::onGetEntity)
+ .onMessage(GetEntityOwnerBackendRequest.class, this::onGetEntityOwner)
.build();
}
return this;
}
- private Behavior<OwnerSupervisorCommand> onGetEntities(final GetEntitiesRequest request) {
- request.getReplyTo().tell(new GetEntitiesReply(currentOwners, currentCandidates));
+ private Behavior<OwnerSupervisorCommand> onGetEntities(final GetEntitiesBackendRequest request) {
+ request.getReplyTo().tell(StatusReply.success(new GetEntitiesBackendReply(currentOwners, currentCandidates)));
return this;
}
- private Behavior<OwnerSupervisorCommand> onGetEntity(final GetEntityRequest request) {
+ private Behavior<OwnerSupervisorCommand> onGetEntity(final GetEntityBackendRequest request) {
final DOMEntity entity = extractEntity(request);
- request.getReplyTo().tell(new GetEntityReply(currentOwners.get(entity), currentCandidates.get(entity)));
+ request.getReplyTo().tell(StatusReply.success(
+ new GetEntityBackendReply(currentOwners.get(entity), currentCandidates.get(entity))));
return this;
}
- private Behavior<OwnerSupervisorCommand> onGetEntityOwner(final GetEntityOwnerRequest request) {
- request.getReplyTo().tell(new GetEntityOwnerReply(currentOwners.get(extractEntity(request))));
+ private Behavior<OwnerSupervisorCommand> onGetEntityOwner(final GetEntityOwnerBackendRequest request) {
+ request.getReplyTo().tell(
+ StatusReply.success(new GetEntityOwnerBackendReply(currentOwners.get(extractEntity(request)))));
return this;
}
import akka.cluster.ddata.typed.javadsl.DistributedData;
import akka.cluster.ddata.typed.javadsl.Replicator;
import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import akka.pattern.StatusReply;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterActivated;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialCandidateSync;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialOwnerSync;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorRequest;
import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
return newReceiveBuilder()
.onMessage(InitialCandidateSync.class, this::onInitialCandidateSync)
.onMessage(InitialOwnerSync.class, this::onInitialOwnerSync)
+ .onMessage(GetEntitiesBackendRequest.class, this::onFailEntityRpc)
+ .onMessage(GetEntityBackendRequest.class, this::onFailEntityRpc)
+ .onMessage(GetEntityOwnerBackendRequest.class, this::onFailEntityRpc)
.build();
}
+ private Behavior<OwnerSupervisorCommand> onFailEntityRpc(final OwnerSupervisorRequest message) {
+ LOG.debug("Failing rpc request. {}", message);
+ message.getReplyTo().tell(StatusReply.error(
+ "OwnerSupervisor is inactive so it cannot handle entity rpc requests."));
+ return this;
+ }
+
private Behavior<OwnerSupervisorCommand> onInitialCandidateSync(final InitialCandidateSync rsp) {
final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response = rsp.getResponse();
if (response instanceof Replicator.GetSuccess) {
package org.opendaylight.controller.eos.akka.owner.supervisor.command;
import akka.actor.typed.ActorRef;
+import akka.pattern.StatusReply;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityName;
private final @NonNull EntityType type;
private final @NonNull EntityName name;
- AbstractEntityRequest(final ActorRef<T> replyTo, final EntityId entity) {
+ AbstractEntityRequest(final ActorRef<StatusReply<T>> replyTo, final EntityId entity) {
super(replyTo);
this.type = entity.requireType();
this.name = entity.requireName();
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSetMultimap;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Set;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public final class GetEntitiesBackendReply extends OwnerSupervisorReply implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final ImmutableSetMultimap<DOMEntity, String> candidates;
+ private final ImmutableMap<DOMEntity, String> owners;
+
+ public GetEntitiesBackendReply(final Map<DOMEntity, String> owners, final Map<DOMEntity, Set<String>> candidates) {
+ final ImmutableSetMultimap.Builder<DOMEntity, String> builder = ImmutableSetMultimap.builder();
+ for (Map.Entry<DOMEntity, Set<String>> entry : candidates.entrySet()) {
+ builder.putAll(entry.getKey(), entry.getValue());
+ }
+ this.candidates = builder.build();
+ this.owners = ImmutableMap.copyOf(owners);
+ }
+
+ public ImmutableSetMultimap<DOMEntity, String> getCandidates() {
+ return candidates;
+ }
+
+ public ImmutableMap<DOMEntity, String> getOwners() {
+ return owners;
+ }
+}
package org.opendaylight.controller.eos.akka.owner.supervisor.command;
import akka.actor.typed.ActorRef;
+import akka.pattern.StatusReply;
-public final class GetEntitiesRequest extends OwnerSupervisorRequest<GetEntitiesReply> {
+public final class GetEntitiesBackendRequest extends OwnerSupervisorRequest<GetEntitiesBackendReply> {
private static final long serialVersionUID = 1L;
- public GetEntitiesRequest(final ActorRef<GetEntitiesReply> replyTo) {
+ public GetEntitiesBackendRequest(final ActorRef<StatusReply<GetEntitiesBackendReply>> replyTo) {
super(replyTo);
}
}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import com.google.common.collect.ImmutableSet;
+import java.io.Serializable;
+import java.util.Set;
+import org.eclipse.jdt.annotation.Nullable;
+
+public final class GetEntityBackendReply extends OwnerSupervisorReply implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final ImmutableSet<String> candidates;
+ private final String owner;
+
+ public GetEntityBackendReply(final @Nullable String owner, final @Nullable Set<String> candidates) {
+ this.owner = owner;
+ this.candidates = candidates == null ? ImmutableSet.of() : ImmutableSet.copyOf(candidates);
+ }
+
+ public ImmutableSet<String> getCandidates() {
+ return candidates;
+ }
+
+ public String getOwner() {
+ return owner;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import akka.actor.typed.ActorRef;
+import akka.pattern.StatusReply;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityId;
+
+public final class GetEntityBackendRequest extends AbstractEntityRequest<GetEntityBackendReply> {
+ private static final long serialVersionUID = 1L;
+
+ public GetEntityBackendRequest(final ActorRef<StatusReply<GetEntityBackendReply>> replyTo, final EntityId entity) {
+ super(replyTo, entity);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import java.io.Serializable;
+import org.eclipse.jdt.annotation.Nullable;
+
+public final class GetEntityOwnerBackendReply extends OwnerSupervisorReply implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String owner;
+
+ public GetEntityOwnerBackendReply(final @Nullable String owner) {
+ this.owner = owner;
+ }
+
+ public String getOwner() {
+ return owner;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import akka.actor.typed.ActorRef;
+import akka.pattern.StatusReply;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityId;
+
+public final class GetEntityOwnerBackendRequest extends AbstractEntityRequest<GetEntityOwnerBackendReply> {
+ private static final long serialVersionUID = 1L;
+
+ public GetEntityOwnerBackendRequest(final ActorRef<StatusReply<GetEntityOwnerBackendReply>> replyTo,
+ final EntityId entity) {
+ super(replyTo, entity);
+ }
+}
import static java.util.Objects.requireNonNull;
import akka.actor.typed.ActorRef;
+import akka.pattern.StatusReply;
import java.io.Serializable;
import org.eclipse.jdt.annotation.NonNull;
-abstract class OwnerSupervisorRequest<T extends OwnerSupervisorReply> extends OwnerSupervisorCommand
+public abstract class OwnerSupervisorRequest<T extends OwnerSupervisorReply> extends OwnerSupervisorCommand
implements Serializable {
private static final long serialVersionUID = 1L;
- private final @NonNull ActorRef<T> replyTo;
+ private final @NonNull ActorRef<StatusReply<T>> replyTo;
- OwnerSupervisorRequest(final ActorRef<T> replyTo) {
+ OwnerSupervisorRequest(final ActorRef<StatusReply<T>> replyTo) {
this.replyTo = requireNonNull(replyTo);
}
- public final @NonNull ActorRef<T> getReplyTo() {
+ public final @NonNull ActorRef<StatusReply<T>> getReplyTo() {
return replyTo;
}
}
private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes";
private static final String DATA_CENTER_PARAM = "akka.cluster.multi-data-center.self-data-center";
- protected static MockNativeEntityOwnershipService startupNativeService(final int port, List<String> roles,
+ protected static MockNativeEntityOwnershipService startupNativeService(final int port, final List<String> roles,
final List<String> seedNodes)
throws ExecutionException, InterruptedException {
final Map<String, Object> overrides = new HashMap<>();
protected static ClusterNode startupWithDatacenter(final int port, final List<String> roles,
final List<String> seedNodes, final String dataCenter)
throws ExecutionException, InterruptedException {
+ final akka.actor.ActorSystem system = startupActorSystem(port, roles, seedNodes, dataCenter);
+ final ActorRef<BootstrapCommand> eosBootstrap =
+ Adapter.spawn(system, EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec()), "EOSBootstrap");
+
+ final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
+ GetRunningContext::new,
+ Duration.ofSeconds(5),
+ Adapter.toTyped(system.scheduler()));
+ final RunningContext runningContext = ask.toCompletableFuture().get();
+
+ return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
+ runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
+ }
+
+ protected static akka.actor.ActorSystem startupActorSystem(final int port, final List<String> roles,
+ final List<String> seedNodes) {
final Map<String, Object> overrides = new HashMap<>();
overrides.put(PORT_PARAM, port);
overrides.put(ROLE_PARAM, roles);
if (!seedNodes.isEmpty()) {
overrides.put(SEED_NODES_PARAM, seedNodes);
}
- overrides.put(DATA_CENTER_PARAM, dataCenter);
final Config config = ConfigFactory.parseMap(overrides)
.withFallback(ConfigFactory.load());
// Create a classic Akka system since thats what we will have in osgi
- final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
- final ActorRef<BootstrapCommand> eosBootstrap =
- Adapter.spawn(system, EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec()), "EOSBootstrap");
+ return akka.actor.ActorSystem.create("ClusterSystem", config);
+ }
- final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
- GetRunningContext::new,
- Duration.ofSeconds(5),
- Adapter.toTyped(system.scheduler()));
- final RunningContext runningContext = ask.toCompletableFuture().get();
+ protected static akka.actor.ActorSystem startupActorSystem(final int port, final List<String> roles,
+ final List<String> seedNodes, final String dataCenter) {
+ final Map<String, Object> overrides = new HashMap<>();
+ overrides.put(PORT_PARAM, port);
+ overrides.put(ROLE_PARAM, roles);
+ if (!seedNodes.isEmpty()) {
+ overrides.put(SEED_NODES_PARAM, seedNodes);
+ }
+ overrides.put(DATA_CENTER_PARAM, dataCenter);
- return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
- runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
+ final Config config = ConfigFactory.parseMap(overrides)
+ .withFallback(ConfigFactory.load());
+
+ // Create a classic Akka system since thats what we will have in osgi
+ return akka.actor.ActorSystem.create("ClusterSystem", config);
}
private static Behavior<BootstrapCommand> rootBehavior() {
verifyNoNotifications(listener, 2);
}
- protected static void verifyNoNotifications(final MockEntityOwnershipListener listener, long delaySeconds) {
+ protected static void verifyNoNotifications(final MockEntityOwnershipListener listener, final long delaySeconds) {
await().pollDelay(delaySeconds, TimeUnit.SECONDS).until(() -> listener.getChanges().isEmpty());
}
protected static void verifyNoAdditionalNotifications(
- final MockEntityOwnershipListener listener, long delaySeconds) {
+ final MockEntityOwnershipListener listener, final long delaySeconds) {
listener.resetListener();
verifyNoNotifications(listener, delaySeconds);
}
}
protected static final class MockNativeEntityOwnershipService extends AkkaEntityOwnershipService {
- private ActorSystem classicActorSystem;
+ private final ActorSystem classicActorSystem;
- protected MockNativeEntityOwnershipService(ActorSystem classicActorSystem)
+ protected MockNativeEntityOwnershipService(final ActorSystem classicActorSystem)
throws ExecutionException, InterruptedException {
super(classicActorSystem, CODEC_CONTEXT);
this.classicActorSystem = classicActorSystem;
import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityName;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.NodeName;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.get.entities.output.EntitiesKey;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
assertNull(getEntityResult.getResult().getOwnerNode());
assertTrue(getEntityResult.getResult().getCandidateNodes().isEmpty());
+
+ final GetEntitiesOutput getEntitiesResult =
+ service.getEntities(new GetEntitiesInputBuilder().build()).get().getResult();
+
+ assertEquals(getEntitiesResult.getEntities().size(), 1);
+ assertTrue(getEntitiesResult.getEntities().get(new EntitiesKey(
+ new EntityName(CODEC_CONTEXT.fromYangInstanceIdentifier(entityId)), new EntityType(ENTITY_TYPE)))
+ .getCandidateNodes().contains(new NodeName("member-1")));
+ assertTrue(getEntitiesResult.getEntities().get(new EntitiesKey(
+ new EntityName(CODEC_CONTEXT.fromYangInstanceIdentifier(entityId)),
+ new EntityType(ENTITY_TYPE)))
+ .getOwnerNode().getValue().equals("member-1"));
+
+ final GetEntityOwnerOutput getOwnerResult = service.getEntityOwner(new GetEntityOwnerInputBuilder()
+ .setName(new EntityName(CODEC_CONTEXT.fromYangInstanceIdentifier(entityId)))
+ .setType(new EntityType(ENTITY_TYPE))
+ .build())
+ .get().getResult();
+
+ assertEquals(getOwnerResult.getOwnerNode().getValue(), "member-1");
}
private static void verifyGetOwnershipState(final DOMEntityOwnershipService service, final DOMEntity entity,
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import akka.actor.ActorSystem;
+import akka.actor.testkit.typed.javadsl.ActorTestKit;
+import akka.actor.typed.javadsl.Adapter;
+import akka.cluster.Member;
+import akka.cluster.MemberStatus;
+import akka.cluster.typed.Cluster;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipCandidateRegistration;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityName;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.NodeName;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.get.entities.output.EntitiesKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+
+public class EntityRpcHandlerTest extends AbstractNativeEosTest {
+ static final String ENTITY_TYPE = "test";
+
+ private ActorSystem system1;
+ private ActorSystem system2;
+
+ private AkkaEntityOwnershipService service1;
+ private AkkaEntityOwnershipService service2;
+
+ @Before
+ public void setUp() throws Exception {
+ system1 = startupActorSystem(2550, List.of("member-1"), TWO_NODE_SEED_NODES);
+ system2 = startupActorSystem(2551, List.of("member-2"), TWO_NODE_SEED_NODES, "dc-backup");
+
+ service1 = new AkkaEntityOwnershipService(system1, CODEC_CONTEXT);
+ service2 = new AkkaEntityOwnershipService(system2, CODEC_CONTEXT);
+
+ // need to wait until all nodes are ready
+ final Cluster cluster = Cluster.get(Adapter.toTyped(system2));
+ Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> {
+ final List<Member> members = new ArrayList<>();
+ cluster.state().getMembers().forEach(members::add);
+ if (members.size() != 2) {
+ return false;
+ }
+
+ for (final Member member : members) {
+ if (!member.status().equals(MemberStatus.up())) {
+ return false;
+ }
+ }
+
+ return true;
+ });
+ }
+
+ @After
+ public void tearDown() throws InterruptedException, ExecutionException {
+ service1.close();
+ service2.close();
+ ActorTestKit.shutdown(Adapter.toTyped(system1), Duration.ofSeconds(20));
+ ActorTestKit.shutdown(Adapter.toTyped(system2), Duration.ofSeconds(20));
+ }
+
+ /*
+ * Tests entity rpcs handled both by the owner supervisor(service1) and with an idle supervisor(falling
+ * back to distributed-data in an inactive datacenter). This covers both the available cases, datacenters and case
+ * in which the node with active akka-singleton is shutdown and another one takes over.
+ */
+ @Test
+ public void testEntityRetrievalWithUnavailableSupervisor() throws Exception {
+ final YangInstanceIdentifier entityId = YangInstanceIdentifier.create(new NodeIdentifier(NetworkTopology.QNAME),
+ new NodeIdentifier(Topology.QNAME),
+ NodeIdentifierWithPredicates.of(Topology.QNAME, QName.create(Topology.QNAME, "topology-id"), "test"),
+ new NodeIdentifier(Node.QNAME),
+ NodeIdentifierWithPredicates.of(Node.QNAME, QName.create(Node.QNAME, "node-id"), "test://test-node"));
+
+ final DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
+
+ final DOMEntityOwnershipCandidateRegistration reg = service1.registerCandidate(entity);
+
+ await().untilAsserted(() -> {
+ final RpcResult<GetEntityOutput> getEntityResult = service1.getEntity(new GetEntityInputBuilder()
+ .setName(new EntityName(CODEC_CONTEXT.fromYangInstanceIdentifier(entityId)))
+ .setType(new EntityType(ENTITY_TYPE))
+ .build())
+ .get();
+
+ assertEquals(getEntityResult.getResult().getOwnerNode().getValue(), "member-1");
+ assertEquals(getEntityResult.getResult().getCandidateNodes().get(0).getValue(), "member-1");
+ });
+
+ // keep this under ask timeout to make sure the singleton actor in the inactive datacenter responds with failure
+ // immediately, so that the rpc actor retries with distributed-data asap
+ await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> {
+ final GetEntitiesOutput getEntitiesResult =
+ service2.getEntities(new GetEntitiesInputBuilder().build()).get().getResult();
+
+ assertEquals(getEntitiesResult.getEntities().size(), 1);
+ assertTrue(getEntitiesResult.getEntities().get(new EntitiesKey(
+ new EntityName(CODEC_CONTEXT.fromYangInstanceIdentifier(entityId)),
+ new EntityType(ENTITY_TYPE)))
+ .getCandidateNodes().contains(new NodeName("member-1")));
+ assertTrue(getEntitiesResult.getEntities().get(new EntitiesKey(
+ new EntityName(CODEC_CONTEXT.fromYangInstanceIdentifier(entityId)),
+ new EntityType(ENTITY_TYPE)))
+ .getOwnerNode().getValue().equals("member-1"));
+ });
+
+ await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> {
+ final GetEntityOutput getEntityResult = service2.getEntity(new GetEntityInputBuilder()
+ .setName(new EntityName(CODEC_CONTEXT.fromYangInstanceIdentifier(entityId)))
+ .setType(new EntityType(ENTITY_TYPE))
+ .build())
+ .get().getResult();
+
+ assertEquals(getEntityResult.getOwnerNode().getValue(), "member-1");
+ assertEquals(getEntityResult.getCandidateNodes().get(0).getValue(), "member-1");
+ });
+
+ await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> {
+ final GetEntityOwnerOutput getOwnerResult = service2.getEntityOwner(new GetEntityOwnerInputBuilder()
+ .setName(new EntityName(CODEC_CONTEXT.fromYangInstanceIdentifier(entityId)))
+ .setType(new EntityType(ENTITY_TYPE))
+ .build())
+ .get().getResult();
+
+ assertEquals(getOwnerResult.getOwnerNode().getValue(), "member-1");
+ });
+
+ }
+}
private final Map<DOMEntity, String> currentOwners;
private MockSyncer(final ActorContext<OwnerSupervisorCommand> context,
- final Map<DOMEntity, Set<String>> currentCandidates,
- final Map<DOMEntity, String> currentOwners) {
+ final Map<DOMEntity, Set<String>> currentCandidates,
+ final Map<DOMEntity, String> currentOwners) {
super(context);
this.currentCandidates = currentCandidates;
this.currentOwners = currentOwners;
listenerRegistry = context.spawn(EntityTypeListenerRegistry.create(role), "ListenerRegistry");
candidateRegistry = context.spawn(CandidateRegistry.create(), "CandidateRegistry");
- ownerStateChecker = context.spawn(OwnerStateChecker.create(role), "OwnerStateChecker");
final ClusterSingleton clusterSingleton = ClusterSingleton.get(context.getSystem());
// start the initial sync behavior that switches to the regular one after syncing
ownerSupervisor = clusterSingleton.init(SingletonActor.of(
MockSyncer.create(currentCandidates, currentOwners), "OwnerSupervisor"));
+
+ ownerStateChecker = context.spawn(OwnerStateChecker.create(role, ownerSupervisor, null),
+ "OwnerStateChecker");
}
public static Behavior<BootstrapCommand> create(final Map<DOMEntity, Set<String>> currentCandidates,