From: Tomas Cere Date: Fri, 12 Nov 2021 17:17:00 +0000 (+0100) Subject: Add an actor for entity rpc execution. X-Git-Tag: v4.0.7~1 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=1adec580405aafae353f8b0b3a5a0f474a05c6c0 Add an actor for entity rpc execution. Instead of entity-owners rpcs being handled by owner supervisor directly add an actor that handles the execution, preventing failures that happen after the akka singleton has moved to a different node. This new actor first attempts to retrieve current data from the supervisor and if that fails, falls back to using distributed-data. JIRA: CONTROLLER-2010 Change-Id: Idf71d66d380402c10e276c726edef2f5034f7363 Signed-off-by: Tomas Cere Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipService.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipService.java index e815c8a6a0..9520b58d59 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipService.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipService.java @@ -33,18 +33,18 @@ import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand; import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext; import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext; import org.opendaylight.controller.eos.akka.bootstrap.command.Terminate; +import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntitiesRequest; +import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerReply; +import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerRequest; +import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityReply; +import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityRequest; import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState; import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply; import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand; +import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerReply; import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter; import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter; -import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesRequest; -import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerReply; -import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerRequest; -import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityReply; -import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityRequest; import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand; -import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply; import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand; import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate; import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate; @@ -221,23 +221,22 @@ public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, Da AskPattern.ask(ownerSupervisor, DeactivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler)); } - @Override public ListenableFuture> getEntities(final GetEntitiesInput input) { - return toRpcFuture(AskPattern.ask(ownerSupervisor, GetEntitiesRequest::new, QUERY_TIMEOUT, scheduler), + return toRpcFuture(AskPattern.ask(ownerStateChecker, GetEntitiesRequest::new, QUERY_TIMEOUT, scheduler), reply -> reply.toOutput(iidCodec)); } @Override public ListenableFuture> getEntity(final GetEntityInput input) { - return toRpcFuture(AskPattern.ask(ownerSupervisor, + return toRpcFuture(AskPattern.ask(ownerStateChecker, (final ActorRef replyTo) -> new GetEntityRequest(replyTo, input), QUERY_TIMEOUT, scheduler), GetEntityReply::toOutput); } @Override public ListenableFuture> getEntityOwner(final GetEntityOwnerInput input) { - return toRpcFuture(AskPattern.ask(ownerSupervisor, + return toRpcFuture(AskPattern.ask(ownerStateChecker, (final ActorRef replyTo) -> new GetEntityOwnerRequest(replyTo, input), QUERY_TIMEOUT, scheduler), GetEntityOwnerReply::toOutput); } @@ -261,7 +260,7 @@ public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, Da return runningContext; } - private static ListenableFuture> toRpcFuture( + private static ListenableFuture> toRpcFuture( final CompletionStage stage, final Function outputFunction) { final SettableFuture> future = SettableFuture.create(); diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/EOSMain.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/EOSMain.java index c0ffa29350..be1415ed5b 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/EOSMain.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/EOSMain.java @@ -44,12 +44,14 @@ public final class EOSMain extends AbstractBehavior { listenerRegistry = context.spawn(EntityTypeListenerRegistry.create(role), "ListenerRegistry"); candidateRegistry = context.spawn(CandidateRegistryInit.create(), "CandidateRegistry"); - ownerStateChecker = context.spawn(OwnerStateChecker.create(role), "OwnerStateChecker"); final ClusterSingleton clusterSingleton = ClusterSingleton.get(context.getSystem()); // start the initial sync behavior that switches to the regular one after syncing ownerSupervisor = clusterSingleton.init( SingletonActor.of(IdleSupervisor.create(iidCodec), "OwnerSupervisor")); + + ownerStateChecker = context.spawn(OwnerStateChecker.create(role, ownerSupervisor, iidCodec), + "OwnerStateChecker"); } public static Behavior create(final BindingInstanceIdentifierCodec iidCodec) { diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/EntityRpcHandler.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/EntityRpcHandler.java new file mode 100644 index 0000000000..6828009d72 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/EntityRpcHandler.java @@ -0,0 +1,328 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.checker; + +import static com.google.common.base.Verify.verifyNotNull; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.AskPattern; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; +import akka.cluster.ddata.LWWRegister; +import akka.cluster.ddata.LWWRegisterKey; +import akka.cluster.ddata.ORMap; +import akka.cluster.ddata.ORSet; +import akka.cluster.ddata.typed.javadsl.DistributedData; +import akka.cluster.ddata.typed.javadsl.Replicator; +import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter; +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletionStage; +import org.opendaylight.controller.eos.akka.owner.checker.command.AbstractEntityRequest; +import org.opendaylight.controller.eos.akka.owner.checker.command.GetCandidates; +import org.opendaylight.controller.eos.akka.owner.checker.command.GetCandidatesForEntity; +import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntitiesReply; +import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntitiesRequest; +import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerReply; +import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerRequest; +import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityReply; +import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityRequest; +import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnerForEntity; +import org.opendaylight.controller.eos.akka.owner.checker.command.OwnerDataResponse; +import org.opendaylight.controller.eos.akka.owner.checker.command.SingleEntityOwnerDataResponse; +import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendReply; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendReply; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendReply; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand; +import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry; +import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Short-lived actor that is spawned purely for execution of rpcs from the entity-owners model. + */ +public final class EntityRpcHandler extends AbstractBehavior { + private static final Logger LOG = LoggerFactory.getLogger(EntityRpcHandler.class); + private static final Duration ASK_TIMEOUT = Duration.ofSeconds(5); + + private final ReplicatorMessageAdapter> ownerReplicator; + private final ReplicatorMessageAdapter>> candidateReplicator; + + private final ActorRef ownerSupervisor; + private final ActorRef replicator; + + private final BindingInstanceIdentifierCodec iidCodec; + + private final Map> currentCandidates = new HashMap<>(); + private final Map currentOwners = new HashMap<>(); + private final Map entityLookup = new HashMap<>(); + private int toSync = -1; + + public EntityRpcHandler(final ActorContext context, + final ActorRef ownerSupervisor, + final BindingInstanceIdentifierCodec iidCodec) { + super(context); + + replicator = DistributedData.get(context.getSystem()).replicator(); + ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, ASK_TIMEOUT); + candidateReplicator = new ReplicatorMessageAdapter<>(getContext(), replicator, ASK_TIMEOUT); + this.ownerSupervisor = ownerSupervisor; + + this.iidCodec = iidCodec; + } + + public static Behavior create(final ActorRef ownerSupervisor, + final BindingInstanceIdentifierCodec iidCodec) { + return Behaviors.setup(ctx -> new EntityRpcHandler(ctx, ownerSupervisor, iidCodec)); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(GetEntitiesRequest.class, this::onGetEntities) + .onMessage(GetEntityRequest.class, this::onGetEntity) + .onMessage(GetEntityOwnerRequest.class, this::onGetEntityOwner) + .onMessage(GetCandidates.class, this::onCandidatesReceived) + .onMessage(GetCandidatesForEntity.class, this::onCandidatesForEntityReceived) + .onMessage(OwnerDataResponse.class, this::onOwnerDataReceived) + .onMessage(SingleEntityOwnerDataResponse.class, this::onSingleOwnerReceived) + .onMessage(GetOwnerForEntity.class, this::onReplyWithOwner) + .build(); + } + + private Behavior onGetEntities(final GetEntitiesRequest request) { + LOG.debug("{} : Executing get-entities rpc.", getContext().getSelf()); + final CompletionStage result = AskPattern.askWithStatus( + ownerSupervisor, + GetEntitiesBackendRequest::new, + ASK_TIMEOUT, + getContext().getSystem().scheduler() + ); + + result.whenComplete((response, throwable) -> { + if (response != null) { + request.getReplyTo().tell(new GetEntitiesReply(response)); + } else { + // retry backed with distributed-data + LOG.debug("{} : Get-entities failed with owner supervisor, falling back to distributed-data.", + getContext().getSelf(), throwable); + getCandidates(request.getReplyTo()); + } + }); + return this; + } + + private Behavior onGetEntity(final GetEntityRequest request) { + LOG.debug("{} : Executing get-entity rpc.", getContext().getSelf()); + final CompletionStage result = AskPattern.askWithStatus( + ownerSupervisor, + replyTo -> new GetEntityBackendRequest(replyTo, request.getEntity()), + ASK_TIMEOUT, + getContext().getSystem().scheduler() + ); + + result.whenComplete((response, throwable) -> { + if (response != null) { + request.getReplyTo().tell(new GetEntityReply(response)); + } else { + // retry backed with distributed-data + LOG.debug("{} : Get-entity failed with owner supervisor, falling back to distributed-data.", + getContext().getSelf(), throwable); + getCandidatesForEntity(extractEntity(request), request.getReplyTo()); + } + }); + return this; + } + + private Behavior onGetEntityOwner(final GetEntityOwnerRequest request) { + LOG.debug("{} : Executing get-entity-owner rpc.", getContext().getSelf()); + final CompletionStage result = AskPattern.askWithStatus( + ownerSupervisor, + replyTo -> new GetEntityOwnerBackendRequest(replyTo, request.getEntity()), + ASK_TIMEOUT, + getContext().getSystem().scheduler() + ); + + result.whenComplete((response, throwable) -> { + if (response != null) { + request.getReplyTo().tell(new GetEntityOwnerReply(response.getOwner())); + } else { + // retry backed with distributed-data + LOG.debug("{} : Get-entity-owner failed with owner supervisor, falling back to distributed-data.", + getContext().getSelf(), throwable); + getOwnerForEntity(extractEntity(request), request.getReplyTo()); + } + }); + return this; + } + + private void getCandidates(final ActorRef replyTo) { + candidateReplicator.askGet( + askReplyTo -> new Replicator.Get<>(CandidateRegistry.KEY, Replicator.readLocal(), askReplyTo), + replicatorResponse -> new GetCandidates(replicatorResponse, replyTo)); + } + + private void getCandidatesForEntity(final DOMEntity entity, final ActorRef replyTo) { + candidateReplicator.askGet( + askReplyTo -> new Replicator.Get<>(CandidateRegistry.KEY, Replicator.readLocal(), askReplyTo), + replicatorResponse -> new GetCandidatesForEntity(replicatorResponse, entity, replyTo)); + } + + private void getOwnerForEntity(final DOMEntity entity, final ActorRef replyTo) { + ownerReplicator.askGet( + askReplyTo -> new Replicator.Get<>( + new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), askReplyTo), + replicatorReponse -> new GetOwnerForEntity(replicatorReponse, entity, replyTo)); + } + + private Behavior onReplyWithOwner(final GetOwnerForEntity message) { + final Replicator.GetResponse> response = message.getResponse(); + if (response instanceof Replicator.GetSuccess) { + message.getReplyTo().tell(new GetEntityOwnerReply( + ((Replicator.GetSuccess>) response).dataValue().getValue())); + } else { + LOG.debug("Unable to retrieve owner for entity: {}, response: {}", message.getEntity(), response); + message.getReplyTo().tell(new GetEntityOwnerReply("")); + } + + return Behaviors.stopped(); + } + + private Behavior onCandidatesReceived(final GetCandidates message) { + final Replicator.GetResponse>> response = message.getResponse(); + if (response instanceof Replicator.GetSuccess) { + return extractCandidates((Replicator.GetSuccess>>) response, + message.getReplyTo()); + } + + LOG.debug("Unable to retrieve candidates from distributed-data. Response: {}", response); + message.getReplyTo().tell(new GetEntitiesReply(Collections.emptyMap(), Collections.emptyMap())); + return Behaviors.stopped(); + } + + private Behavior extractCandidates( + final Replicator.GetSuccess>> response, + final ActorRef replyTo) { + final ORMap> candidates = response.get(CandidateRegistry.KEY); + candidates.getEntries().forEach((key, value) -> currentCandidates.put(key, new HashSet<>(value.getElements()))); + + toSync = candidates.keys().size(); + for (final DOMEntity entity : candidates.keys().getElements()) { + entityLookup.put(entity.toString(), entity); + + ownerReplicator.askGet( + askReplyTo -> new Replicator.Get<>( + new LWWRegisterKey<>(entity.toString()), + Replicator.readLocal(), + askReplyTo), + replicatorResponse -> new OwnerDataResponse(replicatorResponse, replyTo)); + } + + return this; + } + + private Behavior onOwnerDataReceived(final OwnerDataResponse message) { + final Replicator.GetResponse> response = message.getResponse(); + if (response instanceof Replicator.GetSuccess) { + handleOwnerRsp((Replicator.GetSuccess>) response); + } else if (response instanceof Replicator.NotFound) { + handleNotFoundOwnerRsp((Replicator.NotFound>) response); + } else { + LOG.debug("Owner retrieval failed, response: {}", response); + } + + // count the responses, on last respond to rpc and shutdown + toSync--; + if (toSync == 0) { + final GetEntitiesReply getEntitiesReply = new GetEntitiesReply(currentCandidates, currentOwners); + message.getReplyTo().tell(getEntitiesReply); + return Behaviors.stopped(); + } + + return this; + } + + private Behavior onCandidatesForEntityReceived(final GetCandidatesForEntity message) { + LOG.debug("Received CandidatesForEntity: {}", message); + final Replicator.GetResponse>> response = message.getResponse(); + if (response instanceof Replicator.GetSuccess) { + return extractCandidatesForEntity((Replicator.GetSuccess>>) response, + message.getEntity(), message.getReplyTo()); + } else { + LOG.debug("Unable to retrieve candidates for entity: {}. Response:: {}", message.getEntity(), response); + message.getReplyTo().tell(new GetEntityReply(null, Collections.emptySet())); + return this; + } + } + + private Behavior extractCandidatesForEntity( + final Replicator.GetSuccess>> response, final DOMEntity entity, + final ActorRef replyTo) { + final Map> entries = response.get(CandidateRegistry.KEY).getEntries(); + currentCandidates.put(entity, entries.get(entity).getElements()); + + entityLookup.put(entity.toString(), entity); + ownerReplicator.askGet( + askReplyTo -> new Replicator.Get<>( + new LWWRegisterKey<>(entity.toString()), + Replicator.readLocal(), + askReplyTo), + replicatorResponse -> new SingleEntityOwnerDataResponse(replicatorResponse, entity, replyTo)); + + return this; + } + + private void handleOwnerRsp(final Replicator.GetSuccess> rsp) { + final DOMEntity entity = entityLookup.get(rsp.key().id()); + final String owner = rsp.get(rsp.key()).getValue(); + + currentOwners.put(entity, owner); + } + + private static void handleNotFoundOwnerRsp(final Replicator.NotFound> rsp) { + LOG.debug("Owner not found. {}", rsp); + } + + private Behavior onSingleOwnerReceived(final SingleEntityOwnerDataResponse message) { + LOG.debug("Received owner for single entity: {}", message); + final Replicator.GetResponse> response = message.getResponse(); + final GetEntityReply reply; + if (response instanceof Replicator.GetSuccess) { + reply = new GetEntityReply(((Replicator.GetSuccess>) response).dataValue().getValue(), + currentCandidates.get(message.getEntity())); + } else { + reply = new GetEntityReply(null, currentCandidates.get(message.getEntity())); + } + + message.getReplyTo().tell(reply); + return Behaviors.stopped(); + } + + private DOMEntity extractEntity(final AbstractEntityRequest request) { + final var name = request.getName(); + final var iid = name.getInstanceIdentifier(); + if (iid != null) { + return new DOMEntity(request.getType().getValue(), iidCodec.fromBinding(iid)); + } + final var str = verifyNotNull(name.getString(), "Unhandled entity name %s", name); + return new DOMEntity(request.getType().getValue(), str); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/OwnerStateChecker.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/OwnerStateChecker.java index 57c40aca78..6d418a80e7 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/OwnerStateChecker.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/OwnerStateChecker.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.eos.akka.owner.checker; import static java.util.Objects.requireNonNull; +import akka.actor.typed.ActorRef; import akka.actor.typed.Behavior; import akka.actor.typed.javadsl.AbstractBehavior; import akka.actor.typed.javadsl.ActorContext; @@ -17,6 +18,7 @@ import akka.actor.typed.javadsl.Receive; import akka.cluster.ddata.LWWRegister; import akka.cluster.ddata.LWWRegisterKey; import akka.cluster.ddata.typed.javadsl.DistributedData; +import akka.cluster.ddata.typed.javadsl.Replicator; import akka.cluster.ddata.typed.javadsl.Replicator.Get; import akka.cluster.ddata.typed.javadsl.Replicator.GetFailure; import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse; @@ -25,10 +27,16 @@ import akka.cluster.ddata.typed.javadsl.Replicator.NotFound; import akka.cluster.ddata.typed.javadsl.Replicator.ReadMajority; import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter; import java.time.Duration; +import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntitiesRequest; +import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerRequest; +import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityRequest; import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState; import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply; import org.opendaylight.controller.eos.akka.owner.checker.command.InternalGetReply; import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand; +import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerRequest; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand; +import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec; import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,18 +46,28 @@ public final class OwnerStateChecker extends AbstractBehavior> replicatorAdapter; + private final ReplicatorMessageAdapter> ownerReplicator; + private final ActorRef ownerSupervisor; + private final BindingInstanceIdentifierCodec iidCodec; + private final ActorRef replicator; private final String localMember; - private OwnerStateChecker(final ActorContext context, final String localMember) { + private OwnerStateChecker(final ActorContext context, + final String localMember, + final ActorRef ownerSupervisor, + final BindingInstanceIdentifierCodec iidCodec) { super(context); this.localMember = requireNonNull(localMember); - replicatorAdapter = new ReplicatorMessageAdapter<>(context, - DistributedData.get(context.getSystem()).replicator(), UNEXPECTED_ASK_TIMEOUT); + this.ownerSupervisor = requireNonNull(ownerSupervisor); + this.iidCodec = requireNonNull(iidCodec); + replicator = DistributedData.get(context.getSystem()).replicator(); + ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, UNEXPECTED_ASK_TIMEOUT); } - public static Behavior create(final String localMember) { - return Behaviors.setup(ctx -> new OwnerStateChecker(ctx, localMember)); + public static Behavior create(final String localMember, + final ActorRef ownerSupervisor, + final BindingInstanceIdentifierCodec iidCodec) { + return Behaviors.setup(ctx -> new OwnerStateChecker(ctx, localMember, ownerSupervisor, iidCodec)); } @Override @@ -57,11 +75,14 @@ public final class OwnerStateChecker extends AbstractBehavior onGetOwnershipState(final GetOwnershipState message) { - replicatorAdapter.askGet( + ownerReplicator.askGet( askReplyTo -> new Get<>( new LWWRegisterKey<>(message.getEntity().toString()), new ReadMajority(GET_OWNERSHIP_TIMEOUT), @@ -90,4 +111,13 @@ public final class OwnerStateChecker extends AbstractBehavior executeEntityRpc(final StateCheckerRequest request) { + final ActorRef rpcHandler = + getContext().spawnAnonymous(EntityRpcHandler.create(ownerSupervisor, iidCodec)); + + LOG.debug("Executing entity rpc: {} in actor: {}", request, rpcHandler); + rpcHandler.tell(request); + return this; + } } diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/AbstractEntityRequest.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/AbstractEntityRequest.java new file mode 100644 index 0000000000..ae1dfc2110 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/AbstractEntityRequest.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.checker.command; + +import akka.actor.typed.ActorRef; +import com.google.common.base.MoreObjects; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityName; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.get.entities.output.EntitiesBuilder; + +public abstract class AbstractEntityRequest extends StateCheckerRequest { + private static final long serialVersionUID = 1L; + + private final @NonNull EntityType type; + private final @NonNull EntityName name; + + AbstractEntityRequest(final ActorRef replyTo, final EntityId entity) { + super(replyTo); + type = entity.requireType(); + name = entity.requireName(); + } + + public final @NonNull EntityId getEntity() { + return new EntitiesBuilder().setType(type).setName(name).build(); + } + + public final @NonNull EntityType getType() { + return type; + } + + public final @NonNull EntityName getName() { + return name; + } + + @Override + public final String toString() { + return MoreObjects.toStringHelper(this).add("type", type).add("name", name).toString(); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetCandidates.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetCandidates.java new file mode 100644 index 0000000000..50dddb3977 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetCandidates.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.checker.command; + +import static java.util.Objects.requireNonNull; + +import akka.actor.typed.ActorRef; +import akka.cluster.ddata.ORMap; +import akka.cluster.ddata.ORSet; +import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; + +public final class GetCandidates extends StateCheckerCommand { + private final @Nullable GetResponse>> response; + private final @NonNull ActorRef replyTo; + + public GetCandidates(final GetResponse>> response, + final ActorRef replyTo) { + this.response = response; + this.replyTo = requireNonNull(replyTo); + } + + public @Nullable GetResponse>> getResponse() { + return response; + } + + public @NonNull ActorRef getReplyTo() { + return replyTo; + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetCandidatesForEntity.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetCandidatesForEntity.java new file mode 100644 index 0000000000..14a545be0c --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetCandidatesForEntity.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.checker.command; + +import static java.util.Objects.requireNonNull; + +import akka.actor.typed.ActorRef; +import akka.cluster.ddata.ORMap; +import akka.cluster.ddata.ORSet; +import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; + +public final class GetCandidatesForEntity extends StateCheckerCommand { + private final @Nullable GetResponse>> response; + private final @NonNull DOMEntity entity; + private final @NonNull ActorRef replyTo; + + public GetCandidatesForEntity(final GetResponse>> response, + final DOMEntity entity, final ActorRef replyTo) { + this.response = response; + this.entity = requireNonNull(entity); + this.replyTo = requireNonNull(replyTo); + } + + public @Nullable GetResponse>> getResponse() { + return response; + } + + public @NonNull DOMEntity getEntity() { + return entity; + } + + public @NonNull ActorRef getReplyTo() { + return replyTo; + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntitiesReply.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetEntitiesReply.java similarity index 85% rename from opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntitiesReply.java rename to opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetEntitiesReply.java index 1cfa31ab00..c66f84ddf0 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntitiesReply.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetEntitiesReply.java @@ -5,21 +5,20 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.eos.akka.owner.supervisor.command; +package org.opendaylight.controller.eos.akka.owner.checker.command; import static com.google.common.base.Verify.verify; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSetMultimap; -import com.google.common.collect.ImmutableSetMultimap.Builder; import com.google.common.collect.Iterables; import java.io.Serializable; import java.util.HashSet; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.stream.Collectors; import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendReply; import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec; import org.opendaylight.mdsal.eos.dom.api.DOMEntity; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityName; @@ -33,15 +32,20 @@ import org.opendaylight.yangtools.yang.binding.util.BindingMap; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; -public final class GetEntitiesReply extends OwnerSupervisorReply implements Serializable { +public final class GetEntitiesReply extends StateCheckerReply implements Serializable { private static final long serialVersionUID = 1L; private final ImmutableSetMultimap candidates; private final ImmutableMap owners; - public GetEntitiesReply(final Map owners, final Map> candidates) { - final Builder builder = ImmutableSetMultimap.builder(); - for (Entry> entry : candidates.entrySet()) { + public GetEntitiesReply(final GetEntitiesBackendReply response) { + this.owners = response.getOwners(); + this.candidates = response.getCandidates(); + } + + public GetEntitiesReply(final Map> candidates, final Map owners) { + final ImmutableSetMultimap.Builder builder = ImmutableSetMultimap.builder(); + for (Map.Entry> entry : candidates.entrySet()) { builder.putAll(entry.getKey(), entry.getValue()); } this.candidates = builder.build(); diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetEntitiesRequest.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetEntitiesRequest.java new file mode 100644 index 0000000000..8894c27e53 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetEntitiesRequest.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.checker.command; + +import akka.actor.typed.ActorRef; + +public final class GetEntitiesRequest extends StateCheckerRequest { + private static final long serialVersionUID = 1L; + + public GetEntitiesRequest(final ActorRef replyTo) { + super(replyTo); + } + + @Override + public String toString() { + return "GetEntitiesRequest{} " + super.toString(); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityOwnerReply.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetEntityOwnerReply.java similarity index 88% rename from opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityOwnerReply.java rename to opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetEntityOwnerReply.java index 282fc4140d..10ccde7de1 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityOwnerReply.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetEntityOwnerReply.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.eos.akka.owner.supervisor.command; +package org.opendaylight.controller.eos.akka.owner.checker.command; import java.io.Serializable; import org.eclipse.jdt.annotation.NonNull; @@ -14,7 +14,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.NodeName; -public final class GetEntityOwnerReply extends OwnerSupervisorReply implements Serializable { +public final class GetEntityOwnerReply extends StateCheckerReply implements Serializable { private static final long serialVersionUID = 1L; private final String owner; diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityOwnerRequest.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetEntityOwnerRequest.java similarity index 91% rename from opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityOwnerRequest.java rename to opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetEntityOwnerRequest.java index 6b341d3840..dedccee8f1 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityOwnerRequest.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetEntityOwnerRequest.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.eos.akka.owner.supervisor.command; +package org.opendaylight.controller.eos.akka.owner.checker.command; import akka.actor.typed.ActorRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityId; diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityReply.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetEntityReply.java similarity index 79% rename from opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityReply.java rename to opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetEntityReply.java index 6fae25c5af..2185a86a2c 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityReply.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetEntityReply.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.eos.akka.owner.supervisor.command; +package org.opendaylight.controller.eos.akka.owner.checker.command; import com.google.common.collect.ImmutableSet; import java.io.Serializable; @@ -13,16 +13,22 @@ import java.util.Set; import java.util.stream.Collectors; import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendReply; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.NodeName; -public final class GetEntityReply extends OwnerSupervisorReply implements Serializable { +public final class GetEntityReply extends StateCheckerReply implements Serializable { private static final long serialVersionUID = 1L; private final ImmutableSet candidates; private final String owner; + public GetEntityReply(final GetEntityBackendReply backendReply) { + candidates = backendReply.getCandidates(); + owner = backendReply.getOwner(); + } + public GetEntityReply(final @Nullable String owner, final @Nullable Set candidates) { this.owner = owner; this.candidates = candidates == null ? ImmutableSet.of() : ImmutableSet.copyOf(candidates); diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityRequest.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetEntityRequest.java similarity index 91% rename from opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityRequest.java rename to opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetEntityRequest.java index 91b73c8d60..2d9d46b49a 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityRequest.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetEntityRequest.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.eos.akka.owner.supervisor.command; +package org.opendaylight.controller.eos.akka.owner.checker.command; import akka.actor.typed.ActorRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityId; diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetOwnerForEntity.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetOwnerForEntity.java new file mode 100644 index 0000000000..71105cd7d5 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetOwnerForEntity.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.checker.command; + +import akka.actor.typed.ActorRef; +import akka.cluster.ddata.LWWRegister; +import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; + +public class GetOwnerForEntity extends StateCheckerCommand { + private final @NonNull GetResponse> response; + private final DOMEntity entity; + private final ActorRef replyTo; + + public GetOwnerForEntity(final @NonNull GetResponse> response, + final DOMEntity entity, final ActorRef replyTo) { + this.response = response; + this.entity = entity; + this.replyTo = replyTo; + } + + public GetResponse> getResponse() { + return response; + } + + public DOMEntity getEntity() { + return entity; + } + + public ActorRef getReplyTo() { + return replyTo; + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/OwnerDataResponse.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/OwnerDataResponse.java new file mode 100644 index 0000000000..b7b612f941 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/OwnerDataResponse.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.checker.command; + +import static java.util.Objects.requireNonNull; + +import akka.actor.typed.ActorRef; +import akka.cluster.ddata.LWWRegister; +import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse; +import org.eclipse.jdt.annotation.NonNull; + +public class OwnerDataResponse extends StateCheckerCommand { + private final @NonNull GetResponse> response; + private final ActorRef replyTo; + + public OwnerDataResponse(final GetResponse> response, + final ActorRef replyTo) { + this.response = requireNonNull(response); + this.replyTo = replyTo; + } + + public @NonNull GetResponse> getResponse() { + return response; + } + + public ActorRef getReplyTo() { + return replyTo; + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/SingleEntityOwnerDataResponse.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/SingleEntityOwnerDataResponse.java new file mode 100644 index 0000000000..94539957ef --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/SingleEntityOwnerDataResponse.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.checker.command; + +import static java.util.Objects.requireNonNull; + +import akka.actor.typed.ActorRef; +import akka.cluster.ddata.LWWRegister; +import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; + +public class SingleEntityOwnerDataResponse extends StateCheckerCommand { + private final @NonNull GetResponse> response; + private final DOMEntity entity; + private final ActorRef replyTo; + + public SingleEntityOwnerDataResponse(final @NonNull GetResponse> response, + final DOMEntity entity, + final ActorRef replyTo) { + this.response = requireNonNull(response); + this.entity = requireNonNull(entity); + this.replyTo = requireNonNull(replyTo); + } + + public @NonNull GetResponse> getResponse() { + return response; + } + + public DOMEntity getEntity() { + return entity; + } + + public ActorRef getReplyTo() { + return replyTo; + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/StateCheckerRequest.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/StateCheckerRequest.java new file mode 100644 index 0000000000..62be328891 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/StateCheckerRequest.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.checker.command; + +import static java.util.Objects.requireNonNull; + +import akka.actor.typed.ActorRef; +import java.io.Serializable; +import org.eclipse.jdt.annotation.NonNull; + +public abstract class StateCheckerRequest extends StateCheckerCommand + implements Serializable { + private static final long serialVersionUID = 1L; + + private final @NonNull ActorRef replyTo; + + StateCheckerRequest(final ActorRef replyTo) { + this.replyTo = requireNonNull(replyTo); + } + + public final @NonNull ActorRef getReplyTo() { + return replyTo; + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/IdleSupervisor.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/IdleSupervisor.java index 7dcfb51218..2baeb62fc3 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/IdleSupervisor.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/IdleSupervisor.java @@ -16,8 +16,13 @@ import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Receive; import akka.cluster.Member; import akka.cluster.typed.Cluster; +import akka.pattern.StatusReply; import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest; import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorRequest; import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,9 +64,19 @@ public final class IdleSupervisor extends AbstractBehavior createReceive() { return newReceiveBuilder() .onMessage(ActivateDataCenter.class, this::onActivateDataCenter) + .onMessage(GetEntitiesBackendRequest.class, this::onFailEntityRpc) + .onMessage(GetEntityBackendRequest.class, this::onFailEntityRpc) + .onMessage(GetEntityOwnerBackendRequest.class, this::onFailEntityRpc) .build(); } + private Behavior onFailEntityRpc(final OwnerSupervisorRequest message) { + LOG.debug("Failing rpc request. {}", message); + message.getReplyTo().tell(StatusReply.error("OwnerSupervisor is inactive so it" + + " cannot handle entity rpc requests.")); + return this; + } + private Behavior onActivateDataCenter(final ActivateDataCenter message) { LOG.debug("Received ActivateDataCenter command switching to syncer behavior,"); return OwnerSyncer.create(message.getReplyTo(), iidCodec); diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java index f66f25aa82..9841b65b7b 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java @@ -29,6 +29,7 @@ import akka.cluster.ddata.typed.javadsl.Replicator; import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter; import akka.cluster.typed.Cluster; import akka.cluster.typed.Subscribe; +import akka.pattern.StatusReply; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -48,12 +49,12 @@ import org.opendaylight.controller.eos.akka.owner.supervisor.command.AbstractEnt import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged; import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated; import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter; -import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesReply; -import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesRequest; -import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerReply; -import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerRequest; -import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityReply; -import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityRequest; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendReply; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendReply; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendReply; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest; import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent; import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent; import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent; @@ -172,9 +173,9 @@ public final class OwnerSupervisor extends AbstractBehavior onGetEntities(final GetEntitiesRequest request) { - request.getReplyTo().tell(new GetEntitiesReply(currentOwners, currentCandidates)); + private Behavior onGetEntities(final GetEntitiesBackendRequest request) { + request.getReplyTo().tell(StatusReply.success(new GetEntitiesBackendReply(currentOwners, currentCandidates))); return this; } - private Behavior onGetEntity(final GetEntityRequest request) { + private Behavior onGetEntity(final GetEntityBackendRequest request) { final DOMEntity entity = extractEntity(request); - request.getReplyTo().tell(new GetEntityReply(currentOwners.get(entity), currentCandidates.get(entity))); + request.getReplyTo().tell(StatusReply.success( + new GetEntityBackendReply(currentOwners.get(entity), currentCandidates.get(entity)))); return this; } - private Behavior onGetEntityOwner(final GetEntityOwnerRequest request) { - request.getReplyTo().tell(new GetEntityOwnerReply(currentOwners.get(extractEntity(request)))); + private Behavior onGetEntityOwner(final GetEntityOwnerBackendRequest request) { + request.getReplyTo().tell( + StatusReply.success(new GetEntityOwnerBackendReply(currentOwners.get(extractEntity(request))))); return this; } diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSyncer.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSyncer.java index 04d27ee4dc..092f532dfb 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSyncer.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSyncer.java @@ -22,6 +22,7 @@ import akka.cluster.ddata.ORSet; import akka.cluster.ddata.typed.javadsl.DistributedData; import akka.cluster.ddata.typed.javadsl.Replicator; import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter; +import akka.pattern.StatusReply; import java.time.Duration; import java.util.HashMap; import java.util.HashSet; @@ -29,10 +30,14 @@ import java.util.Map; import java.util.Set; import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterActivated; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest; import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialCandidateSync; import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialOwnerSync; import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand; import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorRequest; import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry; import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec; import org.opendaylight.mdsal.eos.dom.api.DOMEntity; @@ -87,9 +92,19 @@ public final class OwnerSyncer extends AbstractBehavior return newReceiveBuilder() .onMessage(InitialCandidateSync.class, this::onInitialCandidateSync) .onMessage(InitialOwnerSync.class, this::onInitialOwnerSync) + .onMessage(GetEntitiesBackendRequest.class, this::onFailEntityRpc) + .onMessage(GetEntityBackendRequest.class, this::onFailEntityRpc) + .onMessage(GetEntityOwnerBackendRequest.class, this::onFailEntityRpc) .build(); } + private Behavior onFailEntityRpc(final OwnerSupervisorRequest message) { + LOG.debug("Failing rpc request. {}", message); + message.getReplyTo().tell(StatusReply.error( + "OwnerSupervisor is inactive so it cannot handle entity rpc requests.")); + return this; + } + private Behavior onInitialCandidateSync(final InitialCandidateSync rsp) { final Replicator.GetResponse>> response = rsp.getResponse(); if (response instanceof Replicator.GetSuccess) { diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/AbstractEntityRequest.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/AbstractEntityRequest.java index b3b4c8cff8..5919a6ee73 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/AbstractEntityRequest.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/AbstractEntityRequest.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.eos.akka.owner.supervisor.command; import akka.actor.typed.ActorRef; +import akka.pattern.StatusReply; import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityId; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityName; @@ -19,7 +20,7 @@ public abstract class AbstractEntityRequest exte private final @NonNull EntityType type; private final @NonNull EntityName name; - AbstractEntityRequest(final ActorRef replyTo, final EntityId entity) { + AbstractEntityRequest(final ActorRef> replyTo, final EntityId entity) { super(replyTo); this.type = entity.requireType(); this.name = entity.requireName(); diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntitiesBackendReply.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntitiesBackendReply.java new file mode 100644 index 0000000000..beb858c8df --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntitiesBackendReply.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.supervisor.command; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSetMultimap; +import java.io.Serializable; +import java.util.Map; +import java.util.Set; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; + +public final class GetEntitiesBackendReply extends OwnerSupervisorReply implements Serializable { + private static final long serialVersionUID = 1L; + + private final ImmutableSetMultimap candidates; + private final ImmutableMap owners; + + public GetEntitiesBackendReply(final Map owners, final Map> candidates) { + final ImmutableSetMultimap.Builder builder = ImmutableSetMultimap.builder(); + for (Map.Entry> entry : candidates.entrySet()) { + builder.putAll(entry.getKey(), entry.getValue()); + } + this.candidates = builder.build(); + this.owners = ImmutableMap.copyOf(owners); + } + + public ImmutableSetMultimap getCandidates() { + return candidates; + } + + public ImmutableMap getOwners() { + return owners; + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntitiesRequest.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntitiesBackendRequest.java similarity index 67% rename from opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntitiesRequest.java rename to opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntitiesBackendRequest.java index fdea832798..69ea7dff5a 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntitiesRequest.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntitiesBackendRequest.java @@ -8,11 +8,12 @@ package org.opendaylight.controller.eos.akka.owner.supervisor.command; import akka.actor.typed.ActorRef; +import akka.pattern.StatusReply; -public final class GetEntitiesRequest extends OwnerSupervisorRequest { +public final class GetEntitiesBackendRequest extends OwnerSupervisorRequest { private static final long serialVersionUID = 1L; - public GetEntitiesRequest(final ActorRef replyTo) { + public GetEntitiesBackendRequest(final ActorRef> replyTo) { super(replyTo); } } diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityBackendReply.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityBackendReply.java new file mode 100644 index 0000000000..6130603171 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityBackendReply.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.supervisor.command; + +import com.google.common.collect.ImmutableSet; +import java.io.Serializable; +import java.util.Set; +import org.eclipse.jdt.annotation.Nullable; + +public final class GetEntityBackendReply extends OwnerSupervisorReply implements Serializable { + private static final long serialVersionUID = 1L; + + private final ImmutableSet candidates; + private final String owner; + + public GetEntityBackendReply(final @Nullable String owner, final @Nullable Set candidates) { + this.owner = owner; + this.candidates = candidates == null ? ImmutableSet.of() : ImmutableSet.copyOf(candidates); + } + + public ImmutableSet getCandidates() { + return candidates; + } + + public String getOwner() { + return owner; + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityBackendRequest.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityBackendRequest.java new file mode 100644 index 0000000000..0fa7842287 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityBackendRequest.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.supervisor.command; + +import akka.actor.typed.ActorRef; +import akka.pattern.StatusReply; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityId; + +public final class GetEntityBackendRequest extends AbstractEntityRequest { + private static final long serialVersionUID = 1L; + + public GetEntityBackendRequest(final ActorRef> replyTo, final EntityId entity) { + super(replyTo, entity); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityOwnerBackendReply.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityOwnerBackendReply.java new file mode 100644 index 0000000000..d41185ad5b --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityOwnerBackendReply.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.supervisor.command; + +import java.io.Serializable; +import org.eclipse.jdt.annotation.Nullable; + +public final class GetEntityOwnerBackendReply extends OwnerSupervisorReply implements Serializable { + private static final long serialVersionUID = 1L; + + private final String owner; + + public GetEntityOwnerBackendReply(final @Nullable String owner) { + this.owner = owner; + } + + public String getOwner() { + return owner; + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityOwnerBackendRequest.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityOwnerBackendRequest.java new file mode 100644 index 0000000000..11802cee8f --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityOwnerBackendRequest.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka.owner.supervisor.command; + +import akka.actor.typed.ActorRef; +import akka.pattern.StatusReply; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityId; + +public final class GetEntityOwnerBackendRequest extends AbstractEntityRequest { + private static final long serialVersionUID = 1L; + + public GetEntityOwnerBackendRequest(final ActorRef> replyTo, + final EntityId entity) { + super(replyTo, entity); + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/OwnerSupervisorRequest.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/OwnerSupervisorRequest.java index ab822a7f02..c451681610 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/OwnerSupervisorRequest.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/OwnerSupervisorRequest.java @@ -10,20 +10,21 @@ package org.opendaylight.controller.eos.akka.owner.supervisor.command; import static java.util.Objects.requireNonNull; import akka.actor.typed.ActorRef; +import akka.pattern.StatusReply; import java.io.Serializable; import org.eclipse.jdt.annotation.NonNull; -abstract class OwnerSupervisorRequest extends OwnerSupervisorCommand +public abstract class OwnerSupervisorRequest extends OwnerSupervisorCommand implements Serializable { private static final long serialVersionUID = 1L; - private final @NonNull ActorRef replyTo; + private final @NonNull ActorRef> replyTo; - OwnerSupervisorRequest(final ActorRef replyTo) { + OwnerSupervisorRequest(final ActorRef> replyTo) { this.replyTo = requireNonNull(replyTo); } - public final @NonNull ActorRef getReplyTo() { + public final @NonNull ActorRef> getReplyTo() { return replyTo; } } diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java index 365ef85676..27b4bcba84 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java +++ b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java @@ -92,7 +92,7 @@ public abstract class AbstractNativeEosTest { private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes"; private static final String DATA_CENTER_PARAM = "akka.cluster.multi-data-center.self-data-center"; - protected static MockNativeEntityOwnershipService startupNativeService(final int port, List roles, + protected static MockNativeEntityOwnershipService startupNativeService(final int port, final List roles, final List seedNodes) throws ExecutionException, InterruptedException { final Map overrides = new HashMap<>(); @@ -163,30 +163,51 @@ public abstract class AbstractNativeEosTest { protected static ClusterNode startupWithDatacenter(final int port, final List roles, final List seedNodes, final String dataCenter) throws ExecutionException, InterruptedException { + final akka.actor.ActorSystem system = startupActorSystem(port, roles, seedNodes, dataCenter); + final ActorRef eosBootstrap = + Adapter.spawn(system, EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec()), "EOSBootstrap"); + + final CompletionStage ask = AskPattern.ask(eosBootstrap, + GetRunningContext::new, + Duration.ofSeconds(5), + Adapter.toTyped(system.scheduler())); + final RunningContext runningContext = ask.toCompletableFuture().get(); + + return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(), + runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor()); + } + + protected static akka.actor.ActorSystem startupActorSystem(final int port, final List roles, + final List seedNodes) { final Map overrides = new HashMap<>(); overrides.put(PORT_PARAM, port); overrides.put(ROLE_PARAM, roles); if (!seedNodes.isEmpty()) { overrides.put(SEED_NODES_PARAM, seedNodes); } - overrides.put(DATA_CENTER_PARAM, dataCenter); final Config config = ConfigFactory.parseMap(overrides) .withFallback(ConfigFactory.load()); // Create a classic Akka system since thats what we will have in osgi - final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config); - final ActorRef eosBootstrap = - Adapter.spawn(system, EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec()), "EOSBootstrap"); + return akka.actor.ActorSystem.create("ClusterSystem", config); + } - final CompletionStage ask = AskPattern.ask(eosBootstrap, - GetRunningContext::new, - Duration.ofSeconds(5), - Adapter.toTyped(system.scheduler())); - final RunningContext runningContext = ask.toCompletableFuture().get(); + protected static akka.actor.ActorSystem startupActorSystem(final int port, final List roles, + final List seedNodes, final String dataCenter) { + final Map overrides = new HashMap<>(); + overrides.put(PORT_PARAM, port); + overrides.put(ROLE_PARAM, roles); + if (!seedNodes.isEmpty()) { + overrides.put(SEED_NODES_PARAM, seedNodes); + } + overrides.put(DATA_CENTER_PARAM, dataCenter); - return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(), - runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor()); + final Config config = ConfigFactory.parseMap(overrides) + .withFallback(ConfigFactory.load()); + + // Create a classic Akka system since thats what we will have in osgi + return akka.actor.ActorSystem.create("ClusterSystem", config); } private static Behavior rootBehavior() { @@ -300,12 +321,12 @@ public abstract class AbstractNativeEosTest { verifyNoNotifications(listener, 2); } - protected static void verifyNoNotifications(final MockEntityOwnershipListener listener, long delaySeconds) { + protected static void verifyNoNotifications(final MockEntityOwnershipListener listener, final long delaySeconds) { await().pollDelay(delaySeconds, TimeUnit.SECONDS).until(() -> listener.getChanges().isEmpty()); } protected static void verifyNoAdditionalNotifications( - final MockEntityOwnershipListener listener, long delaySeconds) { + final MockEntityOwnershipListener listener, final long delaySeconds) { listener.resetListener(); verifyNoNotifications(listener, delaySeconds); } @@ -393,9 +414,9 @@ public abstract class AbstractNativeEosTest { } protected static final class MockNativeEntityOwnershipService extends AkkaEntityOwnershipService { - private ActorSystem classicActorSystem; + private final ActorSystem classicActorSystem; - protected MockNativeEntityOwnershipService(ActorSystem classicActorSystem) + protected MockNativeEntityOwnershipService(final ActorSystem classicActorSystem) throws ExecutionException, InterruptedException { super(classicActorSystem, CODEC_CONTEXT); this.classicActorSystem = classicActorSystem; diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipServiceTest.java b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipServiceTest.java index b707b84f2e..199e7931bb 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipServiceTest.java +++ b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipServiceTest.java @@ -45,8 +45,14 @@ import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListenerRegistration import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityName; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.NodeName; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.get.entities.output.EntitiesKey; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; @@ -221,6 +227,26 @@ public class AkkaEntityOwnershipServiceTest extends AbstractNativeEosTest { assertNull(getEntityResult.getResult().getOwnerNode()); assertTrue(getEntityResult.getResult().getCandidateNodes().isEmpty()); + + final GetEntitiesOutput getEntitiesResult = + service.getEntities(new GetEntitiesInputBuilder().build()).get().getResult(); + + assertEquals(getEntitiesResult.getEntities().size(), 1); + assertTrue(getEntitiesResult.getEntities().get(new EntitiesKey( + new EntityName(CODEC_CONTEXT.fromYangInstanceIdentifier(entityId)), new EntityType(ENTITY_TYPE))) + .getCandidateNodes().contains(new NodeName("member-1"))); + assertTrue(getEntitiesResult.getEntities().get(new EntitiesKey( + new EntityName(CODEC_CONTEXT.fromYangInstanceIdentifier(entityId)), + new EntityType(ENTITY_TYPE))) + .getOwnerNode().getValue().equals("member-1")); + + final GetEntityOwnerOutput getOwnerResult = service.getEntityOwner(new GetEntityOwnerInputBuilder() + .setName(new EntityName(CODEC_CONTEXT.fromYangInstanceIdentifier(entityId))) + .setType(new EntityType(ENTITY_TYPE)) + .build()) + .get().getResult(); + + assertEquals(getOwnerResult.getOwnerNode().getValue(), "member-1"); } private static void verifyGetOwnershipState(final DOMEntityOwnershipService service, final DOMEntity entity, diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/EntityRpcHandlerTest.java b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/EntityRpcHandlerTest.java new file mode 100644 index 0000000000..3e43be1b49 --- /dev/null +++ b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/EntityRpcHandlerTest.java @@ -0,0 +1,160 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.eos.akka; + +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import akka.actor.ActorSystem; +import akka.actor.testkit.typed.javadsl.ActorTestKit; +import akka.actor.typed.javadsl.Adapter; +import akka.cluster.Member; +import akka.cluster.MemberStatus; +import akka.cluster.typed.Cluster; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.awaitility.Awaitility; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.mdsal.eos.dom.api.DOMEntity; +import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipCandidateRegistration; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityName; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.NodeName; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.get.entities.output.EntitiesKey; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; + +public class EntityRpcHandlerTest extends AbstractNativeEosTest { + static final String ENTITY_TYPE = "test"; + + private ActorSystem system1; + private ActorSystem system2; + + private AkkaEntityOwnershipService service1; + private AkkaEntityOwnershipService service2; + + @Before + public void setUp() throws Exception { + system1 = startupActorSystem(2550, List.of("member-1"), TWO_NODE_SEED_NODES); + system2 = startupActorSystem(2551, List.of("member-2"), TWO_NODE_SEED_NODES, "dc-backup"); + + service1 = new AkkaEntityOwnershipService(system1, CODEC_CONTEXT); + service2 = new AkkaEntityOwnershipService(system2, CODEC_CONTEXT); + + // need to wait until all nodes are ready + final Cluster cluster = Cluster.get(Adapter.toTyped(system2)); + Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> { + final List members = new ArrayList<>(); + cluster.state().getMembers().forEach(members::add); + if (members.size() != 2) { + return false; + } + + for (final Member member : members) { + if (!member.status().equals(MemberStatus.up())) { + return false; + } + } + + return true; + }); + } + + @After + public void tearDown() throws InterruptedException, ExecutionException { + service1.close(); + service2.close(); + ActorTestKit.shutdown(Adapter.toTyped(system1), Duration.ofSeconds(20)); + ActorTestKit.shutdown(Adapter.toTyped(system2), Duration.ofSeconds(20)); + } + + /* + * Tests entity rpcs handled both by the owner supervisor(service1) and with an idle supervisor(falling + * back to distributed-data in an inactive datacenter). This covers both the available cases, datacenters and case + * in which the node with active akka-singleton is shutdown and another one takes over. + */ + @Test + public void testEntityRetrievalWithUnavailableSupervisor() throws Exception { + final YangInstanceIdentifier entityId = YangInstanceIdentifier.create(new NodeIdentifier(NetworkTopology.QNAME), + new NodeIdentifier(Topology.QNAME), + NodeIdentifierWithPredicates.of(Topology.QNAME, QName.create(Topology.QNAME, "topology-id"), "test"), + new NodeIdentifier(Node.QNAME), + NodeIdentifierWithPredicates.of(Node.QNAME, QName.create(Node.QNAME, "node-id"), "test://test-node")); + + final DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId); + + final DOMEntityOwnershipCandidateRegistration reg = service1.registerCandidate(entity); + + await().untilAsserted(() -> { + final RpcResult getEntityResult = service1.getEntity(new GetEntityInputBuilder() + .setName(new EntityName(CODEC_CONTEXT.fromYangInstanceIdentifier(entityId))) + .setType(new EntityType(ENTITY_TYPE)) + .build()) + .get(); + + assertEquals(getEntityResult.getResult().getOwnerNode().getValue(), "member-1"); + assertEquals(getEntityResult.getResult().getCandidateNodes().get(0).getValue(), "member-1"); + }); + + // keep this under ask timeout to make sure the singleton actor in the inactive datacenter responds with failure + // immediately, so that the rpc actor retries with distributed-data asap + await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> { + final GetEntitiesOutput getEntitiesResult = + service2.getEntities(new GetEntitiesInputBuilder().build()).get().getResult(); + + assertEquals(getEntitiesResult.getEntities().size(), 1); + assertTrue(getEntitiesResult.getEntities().get(new EntitiesKey( + new EntityName(CODEC_CONTEXT.fromYangInstanceIdentifier(entityId)), + new EntityType(ENTITY_TYPE))) + .getCandidateNodes().contains(new NodeName("member-1"))); + assertTrue(getEntitiesResult.getEntities().get(new EntitiesKey( + new EntityName(CODEC_CONTEXT.fromYangInstanceIdentifier(entityId)), + new EntityType(ENTITY_TYPE))) + .getOwnerNode().getValue().equals("member-1")); + }); + + await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> { + final GetEntityOutput getEntityResult = service2.getEntity(new GetEntityInputBuilder() + .setName(new EntityName(CODEC_CONTEXT.fromYangInstanceIdentifier(entityId))) + .setType(new EntityType(ENTITY_TYPE)) + .build()) + .get().getResult(); + + assertEquals(getEntityResult.getOwnerNode().getValue(), "member-1"); + assertEquals(getEntityResult.getCandidateNodes().get(0).getValue(), "member-1"); + }); + + await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> { + final GetEntityOwnerOutput getOwnerResult = service2.getEntityOwner(new GetEntityOwnerInputBuilder() + .setName(new EntityName(CODEC_CONTEXT.fromYangInstanceIdentifier(entityId))) + .setType(new EntityType(ENTITY_TYPE)) + .build()) + .get().getResult(); + + assertEquals(getOwnerResult.getOwnerNode().getValue(), "member-1"); + }); + + } +} diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisorTest.java b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisorTest.java index b396053e29..4bf68a7ef1 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisorTest.java +++ b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisorTest.java @@ -117,8 +117,8 @@ public class OwnerSupervisorTest extends AbstractNativeEosTest { private final Map currentOwners; private MockSyncer(final ActorContext context, - final Map> currentCandidates, - final Map currentOwners) { + final Map> currentCandidates, + final Map currentOwners) { super(context); this.currentCandidates = currentCandidates; this.currentOwners = currentOwners; @@ -163,12 +163,14 @@ public class OwnerSupervisorTest extends AbstractNativeEosTest { listenerRegistry = context.spawn(EntityTypeListenerRegistry.create(role), "ListenerRegistry"); candidateRegistry = context.spawn(CandidateRegistry.create(), "CandidateRegistry"); - ownerStateChecker = context.spawn(OwnerStateChecker.create(role), "OwnerStateChecker"); final ClusterSingleton clusterSingleton = ClusterSingleton.get(context.getSystem()); // start the initial sync behavior that switches to the regular one after syncing ownerSupervisor = clusterSingleton.init(SingletonActor.of( MockSyncer.create(currentCandidates, currentOwners), "OwnerSupervisor")); + + ownerStateChecker = context.spawn(OwnerStateChecker.create(role, ownerSupervisor, null), + "OwnerStateChecker"); } public static Behavior create(final Map> currentCandidates,