Add an actor for entity rpc execution. 66/98466/2
authorTomas Cere <tomas.cere@pantheon.tech>
Fri, 12 Nov 2021 17:17:00 +0000 (18:17 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 14 Nov 2021 07:58:08 +0000 (08:58 +0100)
Instead of entity-owners rpcs being handled by owner supervisor
directly add an actor that handles the execution, preventing failures
that happen after the akka singleton has moved to a different node.

This new actor first attempts to retrieve current data from the supervisor
and if that fails, falls back to using distributed-data.

JIRA: CONTROLLER-2010
Change-Id: Idf71d66d380402c10e276c726edef2f5034f7363
Signed-off-by: Tomas Cere <tomas.cere@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
32 files changed:
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipService.java
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/EOSMain.java
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/EntityRpcHandler.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/OwnerStateChecker.java
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/AbstractEntityRequest.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetCandidates.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetCandidatesForEntity.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetEntitiesReply.java [moved from opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntitiesReply.java with 85% similarity]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetEntitiesRequest.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetEntityOwnerReply.java [moved from opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityOwnerReply.java with 88% similarity]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetEntityOwnerRequest.java [moved from opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityOwnerRequest.java with 91% similarity]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetEntityReply.java [moved from opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityReply.java with 79% similarity]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetEntityRequest.java [moved from opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityRequest.java with 91% similarity]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetOwnerForEntity.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/OwnerDataResponse.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/SingleEntityOwnerDataResponse.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/StateCheckerRequest.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/IdleSupervisor.java
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSyncer.java
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/AbstractEntityRequest.java
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntitiesBackendReply.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntitiesBackendRequest.java [moved from opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntitiesRequest.java with 67% similarity]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityBackendReply.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityBackendRequest.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityOwnerBackendReply.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityOwnerBackendRequest.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/OwnerSupervisorRequest.java
opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java
opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipServiceTest.java
opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/EntityRpcHandlerTest.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisorTest.java

index e815c8a6a0cc886661c2b38e80106b0fd4743150..9520b58d59f852b640d5d2a66a64799b2b493201 100644 (file)
@@ -33,18 +33,18 @@ import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
 import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
 import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
 import org.opendaylight.controller.eos.akka.bootstrap.command.Terminate;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntitiesRequest;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerReply;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerRequest;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityReply;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityRequest;
 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState;
 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply;
 import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
+import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerReply;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesRequest;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerReply;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerRequest;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityReply;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityRequest;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
@@ -221,23 +221,22 @@ public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, Da
             AskPattern.ask(ownerSupervisor, DeactivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler));
     }
 
-
     @Override
     public ListenableFuture<RpcResult<GetEntitiesOutput>> getEntities(final GetEntitiesInput input) {
-        return toRpcFuture(AskPattern.ask(ownerSupervisor, GetEntitiesRequest::new, QUERY_TIMEOUT, scheduler),
+        return toRpcFuture(AskPattern.ask(ownerStateChecker, GetEntitiesRequest::new, QUERY_TIMEOUT, scheduler),
                 reply -> reply.toOutput(iidCodec));
     }
 
     @Override
     public ListenableFuture<RpcResult<GetEntityOutput>> getEntity(final GetEntityInput input) {
-        return toRpcFuture(AskPattern.ask(ownerSupervisor,
+        return toRpcFuture(AskPattern.ask(ownerStateChecker,
             (final ActorRef<GetEntityReply> replyTo) -> new GetEntityRequest(replyTo, input), QUERY_TIMEOUT, scheduler),
             GetEntityReply::toOutput);
     }
 
     @Override
     public ListenableFuture<RpcResult<GetEntityOwnerOutput>> getEntityOwner(final GetEntityOwnerInput input) {
-        return toRpcFuture(AskPattern.ask(ownerSupervisor,
+        return toRpcFuture(AskPattern.ask(ownerStateChecker,
             (final ActorRef<GetEntityOwnerReply> replyTo) -> new GetEntityOwnerRequest(replyTo, input), QUERY_TIMEOUT,
             scheduler), GetEntityOwnerReply::toOutput);
     }
@@ -261,7 +260,7 @@ public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, Da
         return runningContext;
     }
 
-    private static <R extends OwnerSupervisorReply, O extends RpcOutput> ListenableFuture<RpcResult<O>> toRpcFuture(
+    private static <R extends StateCheckerReply, O extends RpcOutput> ListenableFuture<RpcResult<O>> toRpcFuture(
             final CompletionStage<R> stage, final Function<R, O> outputFunction) {
 
         final SettableFuture<RpcResult<O>> future = SettableFuture.create();
index c0ffa29350f34493ebe3ba76f6ee44903d31b234..be1415ed5bf7a1f2e1f0d6b603ba07f4a1ca1447 100644 (file)
@@ -44,12 +44,14 @@ public final class EOSMain extends AbstractBehavior<BootstrapCommand> {
 
         listenerRegistry = context.spawn(EntityTypeListenerRegistry.create(role), "ListenerRegistry");
         candidateRegistry = context.spawn(CandidateRegistryInit.create(), "CandidateRegistry");
-        ownerStateChecker = context.spawn(OwnerStateChecker.create(role), "OwnerStateChecker");
 
         final ClusterSingleton clusterSingleton = ClusterSingleton.get(context.getSystem());
         // start the initial sync behavior that switches to the regular one after syncing
         ownerSupervisor = clusterSingleton.init(
                 SingletonActor.of(IdleSupervisor.create(iidCodec), "OwnerSupervisor"));
+
+        ownerStateChecker = context.spawn(OwnerStateChecker.create(role, ownerSupervisor, iidCodec),
+                "OwnerStateChecker");
     }
 
     public static Behavior<BootstrapCommand> create(final BindingInstanceIdentifierCodec iidCodec) {
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/EntityRpcHandler.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/EntityRpcHandler.java
new file mode 100644 (file)
index 0000000..6828009
--- /dev/null
@@ -0,0 +1,328 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker;
+
+import static com.google.common.base.Verify.verifyNotNull;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.AskPattern;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.LWWRegisterKey;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator;
+import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletionStage;
+import org.opendaylight.controller.eos.akka.owner.checker.command.AbstractEntityRequest;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetCandidates;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetCandidatesForEntity;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntitiesReply;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntitiesRequest;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerReply;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerRequest;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityReply;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityRequest;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnerForEntity;
+import org.opendaylight.controller.eos.akka.owner.checker.command.OwnerDataResponse;
+import org.opendaylight.controller.eos.akka.owner.checker.command.SingleEntityOwnerDataResponse;
+import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
+import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Short-lived actor that is spawned purely for execution of rpcs from the entity-owners model.
+ */
+public final class EntityRpcHandler extends AbstractBehavior<StateCheckerCommand> {
+    private static final Logger LOG = LoggerFactory.getLogger(EntityRpcHandler.class);
+    private static final Duration ASK_TIMEOUT = Duration.ofSeconds(5);
+
+    private final ReplicatorMessageAdapter<StateCheckerCommand, LWWRegister<String>> ownerReplicator;
+    private final ReplicatorMessageAdapter<StateCheckerCommand, ORMap<DOMEntity, ORSet<String>>> candidateReplicator;
+
+    private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
+    private final ActorRef<Replicator.Command> replicator;
+
+    private final BindingInstanceIdentifierCodec iidCodec;
+
+    private final Map<DOMEntity, Set<String>> currentCandidates = new HashMap<>();
+    private final Map<DOMEntity, String> currentOwners = new HashMap<>();
+    private final Map<String, DOMEntity> entityLookup = new HashMap<>();
+    private int toSync = -1;
+
+    public EntityRpcHandler(final ActorContext<StateCheckerCommand> context,
+                            final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
+                            final BindingInstanceIdentifierCodec iidCodec) {
+        super(context);
+
+        replicator = DistributedData.get(context.getSystem()).replicator();
+        ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, ASK_TIMEOUT);
+        candidateReplicator = new ReplicatorMessageAdapter<>(getContext(), replicator, ASK_TIMEOUT);
+        this.ownerSupervisor = ownerSupervisor;
+
+        this.iidCodec = iidCodec;
+    }
+
+    public static Behavior<StateCheckerCommand> create(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
+                                                       final BindingInstanceIdentifierCodec iidCodec) {
+        return Behaviors.setup(ctx -> new EntityRpcHandler(ctx, ownerSupervisor, iidCodec));
+    }
+
+    @Override
+    public Receive<StateCheckerCommand> createReceive() {
+        return newReceiveBuilder()
+                .onMessage(GetEntitiesRequest.class, this::onGetEntities)
+                .onMessage(GetEntityRequest.class, this::onGetEntity)
+                .onMessage(GetEntityOwnerRequest.class, this::onGetEntityOwner)
+                .onMessage(GetCandidates.class, this::onCandidatesReceived)
+                .onMessage(GetCandidatesForEntity.class, this::onCandidatesForEntityReceived)
+                .onMessage(OwnerDataResponse.class, this::onOwnerDataReceived)
+                .onMessage(SingleEntityOwnerDataResponse.class, this::onSingleOwnerReceived)
+                .onMessage(GetOwnerForEntity.class, this::onReplyWithOwner)
+                .build();
+    }
+
+    private Behavior<StateCheckerCommand> onGetEntities(final GetEntitiesRequest request) {
+        LOG.debug("{} : Executing get-entities rpc.", getContext().getSelf());
+        final CompletionStage<GetEntitiesBackendReply> result = AskPattern.askWithStatus(
+                ownerSupervisor,
+                GetEntitiesBackendRequest::new,
+                ASK_TIMEOUT,
+                getContext().getSystem().scheduler()
+        );
+
+        result.whenComplete((response, throwable) -> {
+            if (response != null) {
+                request.getReplyTo().tell(new GetEntitiesReply(response));
+            } else {
+                // retry backed with distributed-data
+                LOG.debug("{} : Get-entities failed with owner supervisor, falling back to distributed-data.",
+                        getContext().getSelf(), throwable);
+                getCandidates(request.getReplyTo());
+            }
+        });
+        return this;
+    }
+
+    private Behavior<StateCheckerCommand> onGetEntity(final GetEntityRequest request) {
+        LOG.debug("{} : Executing get-entity rpc.", getContext().getSelf());
+        final CompletionStage<GetEntityBackendReply> result = AskPattern.askWithStatus(
+                ownerSupervisor,
+                replyTo -> new GetEntityBackendRequest(replyTo, request.getEntity()),
+                ASK_TIMEOUT,
+                getContext().getSystem().scheduler()
+        );
+
+        result.whenComplete((response, throwable) -> {
+            if (response != null) {
+                request.getReplyTo().tell(new GetEntityReply(response));
+            } else {
+                // retry backed with distributed-data
+                LOG.debug("{} : Get-entity failed with owner supervisor, falling back to distributed-data.",
+                        getContext().getSelf(), throwable);
+                getCandidatesForEntity(extractEntity(request), request.getReplyTo());
+            }
+        });
+        return this;
+    }
+
+    private Behavior<StateCheckerCommand> onGetEntityOwner(final GetEntityOwnerRequest request) {
+        LOG.debug("{} : Executing get-entity-owner rpc.", getContext().getSelf());
+        final CompletionStage<GetEntityOwnerBackendReply> result = AskPattern.askWithStatus(
+                ownerSupervisor,
+                replyTo -> new GetEntityOwnerBackendRequest(replyTo, request.getEntity()),
+                ASK_TIMEOUT,
+                getContext().getSystem().scheduler()
+        );
+
+        result.whenComplete((response, throwable) -> {
+            if (response != null) {
+                request.getReplyTo().tell(new GetEntityOwnerReply(response.getOwner()));
+            } else {
+                // retry backed with distributed-data
+                LOG.debug("{} : Get-entity-owner failed with owner supervisor, falling back to distributed-data.",
+                        getContext().getSelf(), throwable);
+                getOwnerForEntity(extractEntity(request), request.getReplyTo());
+            }
+        });
+        return this;
+    }
+
+    private void getCandidates(final ActorRef<GetEntitiesReply> replyTo) {
+        candidateReplicator.askGet(
+                askReplyTo -> new Replicator.Get<>(CandidateRegistry.KEY, Replicator.readLocal(), askReplyTo),
+                replicatorResponse -> new GetCandidates(replicatorResponse, replyTo));
+    }
+
+    private void getCandidatesForEntity(final DOMEntity entity, final ActorRef<GetEntityReply> replyTo) {
+        candidateReplicator.askGet(
+                askReplyTo -> new Replicator.Get<>(CandidateRegistry.KEY, Replicator.readLocal(), askReplyTo),
+                replicatorResponse -> new GetCandidatesForEntity(replicatorResponse, entity, replyTo));
+    }
+
+    private void getOwnerForEntity(final DOMEntity entity, final ActorRef<GetEntityOwnerReply> replyTo) {
+        ownerReplicator.askGet(
+                askReplyTo -> new Replicator.Get<>(
+                        new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), askReplyTo),
+                replicatorReponse -> new GetOwnerForEntity(replicatorReponse, entity, replyTo));
+    }
+
+    private Behavior<StateCheckerCommand> onReplyWithOwner(final GetOwnerForEntity message) {
+        final Replicator.GetResponse<LWWRegister<String>> response = message.getResponse();
+        if (response instanceof Replicator.GetSuccess) {
+            message.getReplyTo().tell(new GetEntityOwnerReply(
+                    ((Replicator.GetSuccess<LWWRegister<String>>) response).dataValue().getValue()));
+        } else {
+            LOG.debug("Unable to retrieve owner for entity: {}, response: {}", message.getEntity(), response);
+            message.getReplyTo().tell(new GetEntityOwnerReply(""));
+        }
+
+        return Behaviors.stopped();
+    }
+
+    private Behavior<StateCheckerCommand> onCandidatesReceived(final GetCandidates message) {
+        final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response = message.getResponse();
+        if (response instanceof Replicator.GetSuccess) {
+            return extractCandidates((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response,
+                    message.getReplyTo());
+        }
+
+        LOG.debug("Unable to retrieve candidates from distributed-data. Response: {}", response);
+        message.getReplyTo().tell(new GetEntitiesReply(Collections.emptyMap(), Collections.emptyMap()));
+        return Behaviors.stopped();
+    }
+
+    private Behavior<StateCheckerCommand> extractCandidates(
+            final Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>> response,
+            final ActorRef<GetEntitiesReply> replyTo) {
+        final ORMap<DOMEntity, ORSet<String>> candidates = response.get(CandidateRegistry.KEY);
+        candidates.getEntries().forEach((key, value) -> currentCandidates.put(key, new HashSet<>(value.getElements())));
+
+        toSync = candidates.keys().size();
+        for (final DOMEntity entity : candidates.keys().getElements()) {
+            entityLookup.put(entity.toString(), entity);
+
+            ownerReplicator.askGet(
+                    askReplyTo -> new Replicator.Get<>(
+                            new LWWRegisterKey<>(entity.toString()),
+                            Replicator.readLocal(),
+                            askReplyTo),
+                    replicatorResponse -> new OwnerDataResponse(replicatorResponse, replyTo));
+        }
+
+        return this;
+    }
+
+    private Behavior<StateCheckerCommand> onOwnerDataReceived(final OwnerDataResponse message) {
+        final Replicator.GetResponse<LWWRegister<String>> response = message.getResponse();
+        if (response instanceof Replicator.GetSuccess) {
+            handleOwnerRsp((Replicator.GetSuccess<LWWRegister<String>>) response);
+        } else if (response instanceof Replicator.NotFound) {
+            handleNotFoundOwnerRsp((Replicator.NotFound<LWWRegister<String>>) response);
+        } else {
+            LOG.debug("Owner retrieval failed, response: {}", response);
+        }
+
+        // count the responses, on last respond to rpc and shutdown
+        toSync--;
+        if (toSync == 0) {
+            final GetEntitiesReply getEntitiesReply = new GetEntitiesReply(currentCandidates, currentOwners);
+            message.getReplyTo().tell(getEntitiesReply);
+            return Behaviors.stopped();
+        }
+
+        return this;
+    }
+
+    private Behavior<StateCheckerCommand> onCandidatesForEntityReceived(final GetCandidatesForEntity message) {
+        LOG.debug("Received CandidatesForEntity: {}", message);
+        final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response = message.getResponse();
+        if (response instanceof Replicator.GetSuccess) {
+            return extractCandidatesForEntity((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response,
+                    message.getEntity(), message.getReplyTo());
+        } else {
+            LOG.debug("Unable to retrieve candidates for entity: {}. Response:: {}", message.getEntity(), response);
+            message.getReplyTo().tell(new GetEntityReply(null, Collections.emptySet()));
+            return this;
+        }
+    }
+
+    private Behavior<StateCheckerCommand> extractCandidatesForEntity(
+            final Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>> response, final DOMEntity entity,
+            final ActorRef<GetEntityReply> replyTo) {
+        final Map<DOMEntity, ORSet<String>> entries = response.get(CandidateRegistry.KEY).getEntries();
+        currentCandidates.put(entity, entries.get(entity).getElements());
+
+        entityLookup.put(entity.toString(), entity);
+        ownerReplicator.askGet(
+                askReplyTo -> new Replicator.Get<>(
+                        new LWWRegisterKey<>(entity.toString()),
+                        Replicator.readLocal(),
+                        askReplyTo),
+                replicatorResponse -> new SingleEntityOwnerDataResponse(replicatorResponse, entity, replyTo));
+
+        return this;
+    }
+
+    private void handleOwnerRsp(final Replicator.GetSuccess<LWWRegister<String>> rsp) {
+        final DOMEntity entity = entityLookup.get(rsp.key().id());
+        final String owner = rsp.get(rsp.key()).getValue();
+
+        currentOwners.put(entity, owner);
+    }
+
+    private static void handleNotFoundOwnerRsp(final Replicator.NotFound<LWWRegister<String>> rsp) {
+        LOG.debug("Owner not found. {}", rsp);
+    }
+
+    private Behavior<StateCheckerCommand> onSingleOwnerReceived(final SingleEntityOwnerDataResponse message) {
+        LOG.debug("Received owner for single entity: {}", message);
+        final Replicator.GetResponse<LWWRegister<String>> response = message.getResponse();
+        final GetEntityReply reply;
+        if (response instanceof Replicator.GetSuccess) {
+            reply = new GetEntityReply(((Replicator.GetSuccess<LWWRegister<String>>) response).dataValue().getValue(),
+                    currentCandidates.get(message.getEntity()));
+        } else {
+            reply = new GetEntityReply(null, currentCandidates.get(message.getEntity()));
+        }
+
+        message.getReplyTo().tell(reply);
+        return Behaviors.stopped();
+    }
+
+    private DOMEntity extractEntity(final AbstractEntityRequest<?> request) {
+        final var name = request.getName();
+        final var iid = name.getInstanceIdentifier();
+        if (iid != null) {
+            return new DOMEntity(request.getType().getValue(), iidCodec.fromBinding(iid));
+        }
+        final var str = verifyNotNull(name.getString(), "Unhandled entity name %s", name);
+        return new DOMEntity(request.getType().getValue(), str);
+    }
+}
index 57c40aca7854c13f3dcb420836777260125d0794..6d418a80e7d9b873988b4e5f0a97acfe6ca279ee 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.eos.akka.owner.checker;
 
 import static java.util.Objects.requireNonNull;
 
+import akka.actor.typed.ActorRef;
 import akka.actor.typed.Behavior;
 import akka.actor.typed.javadsl.AbstractBehavior;
 import akka.actor.typed.javadsl.ActorContext;
@@ -17,6 +18,7 @@ import akka.actor.typed.javadsl.Receive;
 import akka.cluster.ddata.LWWRegister;
 import akka.cluster.ddata.LWWRegisterKey;
 import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator;
 import akka.cluster.ddata.typed.javadsl.Replicator.Get;
 import akka.cluster.ddata.typed.javadsl.Replicator.GetFailure;
 import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse;
@@ -25,10 +27,16 @@ import akka.cluster.ddata.typed.javadsl.Replicator.NotFound;
 import akka.cluster.ddata.typed.javadsl.Replicator.ReadMajority;
 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
 import java.time.Duration;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntitiesRequest;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerRequest;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityRequest;
 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState;
 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply;
 import org.opendaylight.controller.eos.akka.owner.checker.command.InternalGetReply;
 import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
+import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,18 +46,28 @@ public final class OwnerStateChecker extends AbstractBehavior<StateCheckerComman
     private static final Duration GET_OWNERSHIP_TIMEOUT = Duration.ofSeconds(5);
     private static final Duration UNEXPECTED_ASK_TIMEOUT = Duration.ofSeconds(5);
 
-    private final ReplicatorMessageAdapter<StateCheckerCommand, LWWRegister<String>> replicatorAdapter;
+    private final ReplicatorMessageAdapter<StateCheckerCommand, LWWRegister<String>> ownerReplicator;
+    private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
+    private final BindingInstanceIdentifierCodec iidCodec;
+    private final ActorRef<Replicator.Command> replicator;
     private final String localMember;
 
-    private OwnerStateChecker(final ActorContext<StateCheckerCommand> context, final String localMember) {
+    private OwnerStateChecker(final ActorContext<StateCheckerCommand> context,
+                              final String localMember,
+                              final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
+                              final BindingInstanceIdentifierCodec iidCodec) {
         super(context);
         this.localMember = requireNonNull(localMember);
-        replicatorAdapter = new ReplicatorMessageAdapter<>(context,
-            DistributedData.get(context.getSystem()).replicator(), UNEXPECTED_ASK_TIMEOUT);
+        this.ownerSupervisor = requireNonNull(ownerSupervisor);
+        this.iidCodec = requireNonNull(iidCodec);
+        replicator = DistributedData.get(context.getSystem()).replicator();
+        ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, UNEXPECTED_ASK_TIMEOUT);
     }
 
-    public static Behavior<StateCheckerCommand> create(final String localMember) {
-        return Behaviors.setup(ctx -> new OwnerStateChecker(ctx, localMember));
+    public static Behavior<StateCheckerCommand> create(final String localMember,
+                                                       final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
+                                                       final BindingInstanceIdentifierCodec iidCodec) {
+        return Behaviors.setup(ctx -> new OwnerStateChecker(ctx, localMember, ownerSupervisor, iidCodec));
     }
 
     @Override
@@ -57,11 +75,14 @@ public final class OwnerStateChecker extends AbstractBehavior<StateCheckerComman
         return newReceiveBuilder()
                 .onMessage(GetOwnershipState.class, this::onGetOwnershipState)
                 .onMessage(InternalGetReply.class, this::respondWithState)
+                .onMessage(GetEntitiesRequest.class, this::executeEntityRpc)
+                .onMessage(GetEntityRequest.class, this::executeEntityRpc)
+                .onMessage(GetEntityOwnerRequest.class, this::executeEntityRpc)
                 .build();
     }
 
     private Behavior<StateCheckerCommand> onGetOwnershipState(final GetOwnershipState message) {
-        replicatorAdapter.askGet(
+        ownerReplicator.askGet(
                 askReplyTo -> new Get<>(
                         new LWWRegisterKey<>(message.getEntity().toString()),
                         new ReadMajority(GET_OWNERSHIP_TIMEOUT),
@@ -90,4 +111,13 @@ public final class OwnerStateChecker extends AbstractBehavior<StateCheckerComman
         }
         return this;
     }
+
+    private Behavior<StateCheckerCommand> executeEntityRpc(final StateCheckerRequest request) {
+        final ActorRef<StateCheckerCommand> rpcHandler =
+                getContext().spawnAnonymous(EntityRpcHandler.create(ownerSupervisor, iidCodec));
+
+        LOG.debug("Executing entity rpc: {} in actor: {}", request, rpcHandler);
+        rpcHandler.tell(request);
+        return this;
+    }
 }
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/AbstractEntityRequest.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/AbstractEntityRequest.java
new file mode 100644 (file)
index 0000000..ae1dfc2
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker.command;
+
+import akka.actor.typed.ActorRef;
+import com.google.common.base.MoreObjects;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityName;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.get.entities.output.EntitiesBuilder;
+
+public abstract class AbstractEntityRequest<T extends StateCheckerReply> extends StateCheckerRequest<T> {
+    private static final long serialVersionUID = 1L;
+
+    private final @NonNull EntityType type;
+    private final @NonNull EntityName name;
+
+    AbstractEntityRequest(final ActorRef<T> replyTo, final EntityId entity) {
+        super(replyTo);
+        type = entity.requireType();
+        name = entity.requireName();
+    }
+
+    public final @NonNull EntityId getEntity() {
+        return new EntitiesBuilder().setType(type).setName(name).build();
+    }
+
+    public final @NonNull EntityType getType() {
+        return type;
+    }
+
+    public final @NonNull EntityName getName() {
+        return name;
+    }
+
+    @Override
+    public final String toString() {
+        return MoreObjects.toStringHelper(this).add("type", type).add("name", name).toString();
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetCandidates.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetCandidates.java
new file mode 100644 (file)
index 0000000..50dddb3
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.typed.ActorRef;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public final class GetCandidates extends StateCheckerCommand {
+    private final @Nullable GetResponse<ORMap<DOMEntity, ORSet<String>>> response;
+    private final @NonNull ActorRef<GetEntitiesReply> replyTo;
+
+    public GetCandidates(final GetResponse<ORMap<DOMEntity, ORSet<String>>> response,
+                         final ActorRef<GetEntitiesReply> replyTo) {
+        this.response = response;
+        this.replyTo = requireNonNull(replyTo);
+    }
+
+    public @Nullable GetResponse<ORMap<DOMEntity, ORSet<String>>> getResponse() {
+        return response;
+    }
+
+    public @NonNull ActorRef<GetEntitiesReply> getReplyTo() {
+        return replyTo;
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetCandidatesForEntity.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetCandidatesForEntity.java
new file mode 100644 (file)
index 0000000..14a545b
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.typed.ActorRef;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
+import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public final class GetCandidatesForEntity extends StateCheckerCommand {
+    private final @Nullable GetResponse<ORMap<DOMEntity, ORSet<String>>> response;
+    private final @NonNull DOMEntity entity;
+    private final @NonNull ActorRef<GetEntityReply> replyTo;
+
+    public GetCandidatesForEntity(final GetResponse<ORMap<DOMEntity, ORSet<String>>> response,
+                                  final DOMEntity entity, final ActorRef<GetEntityReply> replyTo) {
+        this.response = response;
+        this.entity = requireNonNull(entity);
+        this.replyTo = requireNonNull(replyTo);
+    }
+
+    public @Nullable GetResponse<ORMap<DOMEntity, ORSet<String>>> getResponse() {
+        return response;
+    }
+
+    public @NonNull DOMEntity getEntity() {
+        return entity;
+    }
+
+    public @NonNull ActorRef<GetEntityReply> getReplyTo() {
+        return replyTo;
+    }
+}
@@ -5,21 +5,20 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+package org.opendaylight.controller.eos.akka.owner.checker.command;
 
 import static com.google.common.base.Verify.verify;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSetMultimap;
-import com.google.common.collect.ImmutableSetMultimap.Builder;
 import com.google.common.collect.Iterables;
 import java.io.Serializable;
 import java.util.HashSet;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendReply;
 import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityName;
@@ -33,15 +32,20 @@ import org.opendaylight.yangtools.yang.binding.util.BindingMap;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 
-public final class GetEntitiesReply extends OwnerSupervisorReply implements Serializable {
+public final class GetEntitiesReply extends StateCheckerReply implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private final ImmutableSetMultimap<DOMEntity, String> candidates;
     private final ImmutableMap<DOMEntity, String> owners;
 
-    public GetEntitiesReply(final Map<DOMEntity, String> owners, final Map<DOMEntity, Set<String>> candidates) {
-        final Builder<DOMEntity, String> builder = ImmutableSetMultimap.builder();
-        for (Entry<DOMEntity, Set<String>> entry : candidates.entrySet()) {
+    public GetEntitiesReply(final GetEntitiesBackendReply response) {
+        this.owners = response.getOwners();
+        this.candidates = response.getCandidates();
+    }
+
+    public GetEntitiesReply(final Map<DOMEntity, Set<String>> candidates, final Map<DOMEntity, String> owners) {
+        final ImmutableSetMultimap.Builder<DOMEntity, String> builder = ImmutableSetMultimap.builder();
+        for (Map.Entry<DOMEntity, Set<String>> entry : candidates.entrySet()) {
             builder.putAll(entry.getKey(), entry.getValue());
         }
         this.candidates = builder.build();
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetEntitiesRequest.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetEntitiesRequest.java
new file mode 100644 (file)
index 0000000..8894c27
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker.command;
+
+import akka.actor.typed.ActorRef;
+
+public final class GetEntitiesRequest extends StateCheckerRequest<GetEntitiesReply> {
+    private static final long serialVersionUID = 1L;
+
+    public GetEntitiesRequest(final ActorRef<GetEntitiesReply> replyTo) {
+        super(replyTo);
+    }
+
+    @Override
+    public String toString() {
+        return "GetEntitiesRequest{} " + super.toString();
+    }
+}
@@ -5,7 +5,7 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+package org.opendaylight.controller.eos.akka.owner.checker.command;
 
 import java.io.Serializable;
 import org.eclipse.jdt.annotation.NonNull;
@@ -14,7 +14,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.NodeName;
 
-public final class GetEntityOwnerReply extends OwnerSupervisorReply implements Serializable {
+public final class GetEntityOwnerReply extends StateCheckerReply implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private final String owner;
@@ -5,7 +5,7 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+package org.opendaylight.controller.eos.akka.owner.checker.command;
 
 import akka.actor.typed.ActorRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityId;
@@ -5,7 +5,7 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+package org.opendaylight.controller.eos.akka.owner.checker.command;
 
 import com.google.common.collect.ImmutableSet;
 import java.io.Serializable;
@@ -13,16 +13,22 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendReply;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.NodeName;
 
-public final class GetEntityReply extends OwnerSupervisorReply implements Serializable {
+public final class GetEntityReply extends StateCheckerReply implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private final ImmutableSet<String> candidates;
     private final String owner;
 
+    public GetEntityReply(final GetEntityBackendReply backendReply) {
+        candidates = backendReply.getCandidates();
+        owner = backendReply.getOwner();
+    }
+
     public GetEntityReply(final @Nullable String owner, final @Nullable Set<String> candidates) {
         this.owner = owner;
         this.candidates = candidates == null ? ImmutableSet.of() : ImmutableSet.copyOf(candidates);
@@ -5,7 +5,7 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+package org.opendaylight.controller.eos.akka.owner.checker.command;
 
 import akka.actor.typed.ActorRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityId;
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetOwnerForEntity.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/GetOwnerForEntity.java
new file mode 100644 (file)
index 0000000..71105cd
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker.command;
+
+import akka.actor.typed.ActorRef;
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public class GetOwnerForEntity extends StateCheckerCommand {
+    private final @NonNull GetResponse<LWWRegister<String>> response;
+    private final DOMEntity entity;
+    private final ActorRef<GetEntityOwnerReply> replyTo;
+
+    public GetOwnerForEntity(final @NonNull GetResponse<LWWRegister<String>> response,
+                             final DOMEntity entity, final ActorRef<GetEntityOwnerReply> replyTo) {
+        this.response = response;
+        this.entity = entity;
+        this.replyTo = replyTo;
+    }
+
+    public GetResponse<LWWRegister<String>> getResponse() {
+        return response;
+    }
+
+    public DOMEntity getEntity() {
+        return entity;
+    }
+
+    public ActorRef<GetEntityOwnerReply> getReplyTo() {
+        return replyTo;
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/OwnerDataResponse.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/OwnerDataResponse.java
new file mode 100644 (file)
index 0000000..b7b612f
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.typed.ActorRef;
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse;
+import org.eclipse.jdt.annotation.NonNull;
+
+public class OwnerDataResponse extends StateCheckerCommand {
+    private final @NonNull GetResponse<LWWRegister<String>> response;
+    private final ActorRef<GetEntitiesReply> replyTo;
+
+    public OwnerDataResponse(final GetResponse<LWWRegister<String>> response,
+                             final ActorRef<GetEntitiesReply> replyTo) {
+        this.response = requireNonNull(response);
+        this.replyTo = replyTo;
+    }
+
+    public @NonNull GetResponse<LWWRegister<String>> getResponse() {
+        return response;
+    }
+
+    public ActorRef<GetEntitiesReply> getReplyTo() {
+        return replyTo;
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/SingleEntityOwnerDataResponse.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/SingleEntityOwnerDataResponse.java
new file mode 100644 (file)
index 0000000..9453995
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.typed.ActorRef;
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public class SingleEntityOwnerDataResponse extends StateCheckerCommand {
+    private final @NonNull GetResponse<LWWRegister<String>> response;
+    private final DOMEntity entity;
+    private final ActorRef<GetEntityReply> replyTo;
+
+    public SingleEntityOwnerDataResponse(final @NonNull GetResponse<LWWRegister<String>> response,
+                                         final DOMEntity entity,
+                                         final ActorRef<GetEntityReply> replyTo) {
+        this.response = requireNonNull(response);
+        this.entity = requireNonNull(entity);
+        this.replyTo = requireNonNull(replyTo);
+    }
+
+    public @NonNull GetResponse<LWWRegister<String>> getResponse() {
+        return response;
+    }
+
+    public DOMEntity getEntity() {
+        return entity;
+    }
+
+    public ActorRef<GetEntityReply> getReplyTo() {
+        return replyTo;
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/StateCheckerRequest.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/checker/command/StateCheckerRequest.java
new file mode 100644 (file)
index 0000000..62be328
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.checker.command;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.typed.ActorRef;
+import java.io.Serializable;
+import org.eclipse.jdt.annotation.NonNull;
+
+public abstract class StateCheckerRequest<T extends StateCheckerReply> extends StateCheckerCommand
+        implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final @NonNull ActorRef<T> replyTo;
+
+    StateCheckerRequest(final ActorRef<T> replyTo) {
+        this.replyTo = requireNonNull(replyTo);
+    }
+
+    public final @NonNull ActorRef<T> getReplyTo() {
+        return replyTo;
+    }
+}
index 7dcfb512189bf5f2b3bb0be3fc3038f48cd65efb..2baeb62fc354028c541673d4a1ba569a752ea81b 100644 (file)
@@ -16,8 +16,13 @@ import akka.actor.typed.javadsl.Behaviors;
 import akka.actor.typed.javadsl.Receive;
 import akka.cluster.Member;
 import akka.cluster.typed.Cluster;
+import akka.pattern.StatusReply;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorRequest;
 import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,9 +64,19 @@ public final class IdleSupervisor extends AbstractBehavior<OwnerSupervisorComman
     public Receive<OwnerSupervisorCommand> createReceive() {
         return newReceiveBuilder()
                 .onMessage(ActivateDataCenter.class, this::onActivateDataCenter)
+                .onMessage(GetEntitiesBackendRequest.class, this::onFailEntityRpc)
+                .onMessage(GetEntityBackendRequest.class, this::onFailEntityRpc)
+                .onMessage(GetEntityOwnerBackendRequest.class, this::onFailEntityRpc)
                 .build();
     }
 
+    private Behavior<OwnerSupervisorCommand> onFailEntityRpc(final OwnerSupervisorRequest message) {
+        LOG.debug("Failing rpc request. {}", message);
+        message.getReplyTo().tell(StatusReply.error("OwnerSupervisor is inactive so it"
+                + " cannot handle entity rpc requests."));
+        return this;
+    }
+
     private Behavior<OwnerSupervisorCommand> onActivateDataCenter(final ActivateDataCenter message) {
         LOG.debug("Received ActivateDataCenter command switching to syncer behavior,");
         return OwnerSyncer.create(message.getReplyTo(), iidCodec);
index f66f25aa821d2267cc18633b9752e626171da23f..9841b65b7bd7e53fff42ccbef4987369822b3b59 100644 (file)
@@ -29,6 +29,7 @@ import akka.cluster.ddata.typed.javadsl.Replicator;
 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
 import akka.cluster.typed.Cluster;
 import akka.cluster.typed.Subscribe;
+import akka.pattern.StatusReply;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -48,12 +49,12 @@ import org.opendaylight.controller.eos.akka.owner.supervisor.command.AbstractEnt
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesReply;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesRequest;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerReply;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerRequest;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityReply;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
@@ -172,9 +173,9 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
                 .onMessage(MemberDownEvent.class, this::onPeerDown)
                 .onMessage(MemberReachableEvent.class, this::onPeerReachable)
                 .onMessage(MemberUnreachableEvent.class, this::onPeerUnreachable)
-                .onMessage(GetEntitiesRequest.class, this::onGetEntities)
-                .onMessage(GetEntityRequest.class, this::onGetEntity)
-                .onMessage(GetEntityOwnerRequest.class, this::onGetEntityOwner)
+                .onMessage(GetEntitiesBackendRequest.class, this::onGetEntities)
+                .onMessage(GetEntityBackendRequest.class, this::onGetEntity)
+                .onMessage(GetEntityOwnerBackendRequest.class, this::onGetEntityOwner)
                 .build();
     }
 
@@ -373,19 +374,21 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
         return this;
     }
 
-    private Behavior<OwnerSupervisorCommand> onGetEntities(final GetEntitiesRequest request) {
-        request.getReplyTo().tell(new GetEntitiesReply(currentOwners, currentCandidates));
+    private Behavior<OwnerSupervisorCommand> onGetEntities(final GetEntitiesBackendRequest request) {
+        request.getReplyTo().tell(StatusReply.success(new GetEntitiesBackendReply(currentOwners, currentCandidates)));
         return this;
     }
 
-    private Behavior<OwnerSupervisorCommand> onGetEntity(final GetEntityRequest request) {
+    private Behavior<OwnerSupervisorCommand> onGetEntity(final GetEntityBackendRequest request) {
         final DOMEntity entity = extractEntity(request);
-        request.getReplyTo().tell(new GetEntityReply(currentOwners.get(entity), currentCandidates.get(entity)));
+        request.getReplyTo().tell(StatusReply.success(
+                new GetEntityBackendReply(currentOwners.get(entity), currentCandidates.get(entity))));
         return this;
     }
 
-    private Behavior<OwnerSupervisorCommand> onGetEntityOwner(final GetEntityOwnerRequest request) {
-        request.getReplyTo().tell(new GetEntityOwnerReply(currentOwners.get(extractEntity(request))));
+    private Behavior<OwnerSupervisorCommand> onGetEntityOwner(final GetEntityOwnerBackendRequest request) {
+        request.getReplyTo().tell(
+                StatusReply.success(new GetEntityOwnerBackendReply(currentOwners.get(extractEntity(request)))));
         return this;
     }
 
index 04d27ee4dc47b80c1612f4386118535e23d33044..092f532dfbd06eb6a0097aef157627c4dfd19752 100644 (file)
@@ -22,6 +22,7 @@ import akka.cluster.ddata.ORSet;
 import akka.cluster.ddata.typed.javadsl.DistributedData;
 import akka.cluster.ddata.typed.javadsl.Replicator;
 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import akka.pattern.StatusReply;
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -29,10 +30,14 @@ import java.util.Map;
 import java.util.Set;
 import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterActivated;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialCandidateSync;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialOwnerSync;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorRequest;
 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
 import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
@@ -87,9 +92,19 @@ public final class OwnerSyncer extends AbstractBehavior<OwnerSupervisorCommand>
         return newReceiveBuilder()
                 .onMessage(InitialCandidateSync.class, this::onInitialCandidateSync)
                 .onMessage(InitialOwnerSync.class, this::onInitialOwnerSync)
+                .onMessage(GetEntitiesBackendRequest.class, this::onFailEntityRpc)
+                .onMessage(GetEntityBackendRequest.class, this::onFailEntityRpc)
+                .onMessage(GetEntityOwnerBackendRequest.class, this::onFailEntityRpc)
                 .build();
     }
 
+    private Behavior<OwnerSupervisorCommand> onFailEntityRpc(final OwnerSupervisorRequest message) {
+        LOG.debug("Failing rpc request. {}", message);
+        message.getReplyTo().tell(StatusReply.error(
+            "OwnerSupervisor is inactive so it cannot handle entity rpc requests."));
+        return this;
+    }
+
     private Behavior<OwnerSupervisorCommand> onInitialCandidateSync(final InitialCandidateSync rsp) {
         final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response = rsp.getResponse();
         if (response instanceof Replicator.GetSuccess) {
index b3b4c8cff82e9375286236009e3454c99361de90..5919a6ee737abf7400fbfd1ed5af203b0ab2de3a 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.eos.akka.owner.supervisor.command;
 
 import akka.actor.typed.ActorRef;
+import akka.pattern.StatusReply;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityName;
@@ -19,7 +20,7 @@ public abstract class AbstractEntityRequest<T extends OwnerSupervisorReply> exte
     private final @NonNull EntityType type;
     private final @NonNull EntityName name;
 
-    AbstractEntityRequest(final ActorRef<T> replyTo, final EntityId entity) {
+    AbstractEntityRequest(final ActorRef<StatusReply<T>> replyTo, final EntityId entity) {
         super(replyTo);
         this.type = entity.requireType();
         this.name = entity.requireName();
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntitiesBackendReply.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntitiesBackendReply.java
new file mode 100644 (file)
index 0000000..beb858c
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSetMultimap;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Set;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public final class GetEntitiesBackendReply extends OwnerSupervisorReply implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final ImmutableSetMultimap<DOMEntity, String> candidates;
+    private final ImmutableMap<DOMEntity, String> owners;
+
+    public GetEntitiesBackendReply(final Map<DOMEntity, String> owners, final Map<DOMEntity, Set<String>> candidates) {
+        final ImmutableSetMultimap.Builder<DOMEntity, String> builder = ImmutableSetMultimap.builder();
+        for (Map.Entry<DOMEntity, Set<String>> entry : candidates.entrySet()) {
+            builder.putAll(entry.getKey(), entry.getValue());
+        }
+        this.candidates = builder.build();
+        this.owners = ImmutableMap.copyOf(owners);
+    }
+
+    public ImmutableSetMultimap<DOMEntity, String>  getCandidates() {
+        return candidates;
+    }
+
+    public ImmutableMap<DOMEntity, String> getOwners() {
+        return owners;
+    }
+}
@@ -8,11 +8,12 @@
 package org.opendaylight.controller.eos.akka.owner.supervisor.command;
 
 import akka.actor.typed.ActorRef;
+import akka.pattern.StatusReply;
 
-public final class GetEntitiesRequest extends OwnerSupervisorRequest<GetEntitiesReply> {
+public final class GetEntitiesBackendRequest extends OwnerSupervisorRequest<GetEntitiesBackendReply> {
     private static final long serialVersionUID = 1L;
 
-    public GetEntitiesRequest(final ActorRef<GetEntitiesReply> replyTo) {
+    public GetEntitiesBackendRequest(final ActorRef<StatusReply<GetEntitiesBackendReply>> replyTo) {
         super(replyTo);
     }
 }
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityBackendReply.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityBackendReply.java
new file mode 100644 (file)
index 0000000..6130603
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import com.google.common.collect.ImmutableSet;
+import java.io.Serializable;
+import java.util.Set;
+import org.eclipse.jdt.annotation.Nullable;
+
+public final class GetEntityBackendReply extends OwnerSupervisorReply implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final ImmutableSet<String> candidates;
+    private final String owner;
+
+    public GetEntityBackendReply(final @Nullable String owner, final @Nullable Set<String> candidates) {
+        this.owner = owner;
+        this.candidates = candidates == null ? ImmutableSet.of() : ImmutableSet.copyOf(candidates);
+    }
+
+    public ImmutableSet<String> getCandidates() {
+        return candidates;
+    }
+
+    public String getOwner() {
+        return owner;
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityBackendRequest.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityBackendRequest.java
new file mode 100644 (file)
index 0000000..0fa7842
--- /dev/null
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import akka.actor.typed.ActorRef;
+import akka.pattern.StatusReply;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityId;
+
+public final class GetEntityBackendRequest extends AbstractEntityRequest<GetEntityBackendReply> {
+    private static final long serialVersionUID = 1L;
+
+    public GetEntityBackendRequest(final ActorRef<StatusReply<GetEntityBackendReply>> replyTo, final EntityId entity) {
+        super(replyTo, entity);
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityOwnerBackendReply.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityOwnerBackendReply.java
new file mode 100644 (file)
index 0000000..d41185a
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import java.io.Serializable;
+import org.eclipse.jdt.annotation.Nullable;
+
+public final class GetEntityOwnerBackendReply extends OwnerSupervisorReply implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final String owner;
+
+    public GetEntityOwnerBackendReply(final @Nullable String owner) {
+        this.owner = owner;
+    }
+
+    public String getOwner() {
+        return owner;
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityOwnerBackendRequest.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/GetEntityOwnerBackendRequest.java
new file mode 100644 (file)
index 0000000..11802ce
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import akka.actor.typed.ActorRef;
+import akka.pattern.StatusReply;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityId;
+
+public final class GetEntityOwnerBackendRequest extends AbstractEntityRequest<GetEntityOwnerBackendReply> {
+    private static final long serialVersionUID = 1L;
+
+    public GetEntityOwnerBackendRequest(final ActorRef<StatusReply<GetEntityOwnerBackendReply>> replyTo,
+                                        final EntityId entity) {
+        super(replyTo, entity);
+    }
+}
index ab822a7f02c2bdbc5b383a1d79d4ed7c81393e80..c451681610976eaca4bb1d9bc80758065e8b985b 100644 (file)
@@ -10,20 +10,21 @@ package org.opendaylight.controller.eos.akka.owner.supervisor.command;
 import static java.util.Objects.requireNonNull;
 
 import akka.actor.typed.ActorRef;
+import akka.pattern.StatusReply;
 import java.io.Serializable;
 import org.eclipse.jdt.annotation.NonNull;
 
-abstract class OwnerSupervisorRequest<T extends OwnerSupervisorReply> extends OwnerSupervisorCommand
+public abstract class OwnerSupervisorRequest<T extends OwnerSupervisorReply> extends OwnerSupervisorCommand
         implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final @NonNull ActorRef<T> replyTo;
+    private final @NonNull ActorRef<StatusReply<T>> replyTo;
 
-    OwnerSupervisorRequest(final ActorRef<T> replyTo) {
+    OwnerSupervisorRequest(final ActorRef<StatusReply<T>> replyTo) {
         this.replyTo = requireNonNull(replyTo);
     }
 
-    public final @NonNull ActorRef<T> getReplyTo() {
+    public final @NonNull ActorRef<StatusReply<T>> getReplyTo() {
         return replyTo;
     }
 }
index 365ef85676db447caed3ed916d9b11ee653b9dfd..27b4bcba84deb45a14db68bf4c906bc6d4e96b08 100644 (file)
@@ -92,7 +92,7 @@ public abstract class AbstractNativeEosTest {
     private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes";
     private static final String DATA_CENTER_PARAM = "akka.cluster.multi-data-center.self-data-center";
 
-    protected static MockNativeEntityOwnershipService startupNativeService(final int port, List<String> roles,
+    protected static MockNativeEntityOwnershipService startupNativeService(final int port, final List<String> roles,
                                                                            final List<String> seedNodes)
             throws ExecutionException, InterruptedException {
         final Map<String, Object> overrides = new HashMap<>();
@@ -163,30 +163,51 @@ public abstract class AbstractNativeEosTest {
     protected static ClusterNode startupWithDatacenter(final int port, final List<String> roles,
                                                        final List<String> seedNodes, final String dataCenter)
             throws ExecutionException, InterruptedException {
+        final akka.actor.ActorSystem system = startupActorSystem(port, roles, seedNodes, dataCenter);
+        final ActorRef<BootstrapCommand> eosBootstrap =
+                Adapter.spawn(system, EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec()), "EOSBootstrap");
+
+        final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
+                GetRunningContext::new,
+                Duration.ofSeconds(5),
+                Adapter.toTyped(system.scheduler()));
+        final RunningContext runningContext = ask.toCompletableFuture().get();
+
+        return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
+                runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
+    }
+
+    protected static akka.actor.ActorSystem startupActorSystem(final int port, final List<String> roles,
+                                                               final List<String> seedNodes) {
         final Map<String, Object> overrides = new HashMap<>();
         overrides.put(PORT_PARAM, port);
         overrides.put(ROLE_PARAM, roles);
         if (!seedNodes.isEmpty()) {
             overrides.put(SEED_NODES_PARAM, seedNodes);
         }
-        overrides.put(DATA_CENTER_PARAM, dataCenter);
 
         final Config config = ConfigFactory.parseMap(overrides)
                 .withFallback(ConfigFactory.load());
 
         // Create a classic Akka system since thats what we will have in osgi
-        final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
-        final ActorRef<BootstrapCommand> eosBootstrap =
-                Adapter.spawn(system, EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec()), "EOSBootstrap");
+        return akka.actor.ActorSystem.create("ClusterSystem", config);
+    }
 
-        final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
-                GetRunningContext::new,
-                Duration.ofSeconds(5),
-                Adapter.toTyped(system.scheduler()));
-        final RunningContext runningContext = ask.toCompletableFuture().get();
+    protected static akka.actor.ActorSystem startupActorSystem(final int port, final List<String> roles,
+                                                               final List<String> seedNodes, final String dataCenter) {
+        final Map<String, Object> overrides = new HashMap<>();
+        overrides.put(PORT_PARAM, port);
+        overrides.put(ROLE_PARAM, roles);
+        if (!seedNodes.isEmpty()) {
+            overrides.put(SEED_NODES_PARAM, seedNodes);
+        }
+        overrides.put(DATA_CENTER_PARAM, dataCenter);
 
-        return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
-                runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
+        final Config config = ConfigFactory.parseMap(overrides)
+                .withFallback(ConfigFactory.load());
+
+        // Create a classic Akka system since thats what we will have in osgi
+        return akka.actor.ActorSystem.create("ClusterSystem", config);
     }
 
     private static Behavior<BootstrapCommand> rootBehavior() {
@@ -300,12 +321,12 @@ public abstract class AbstractNativeEosTest {
         verifyNoNotifications(listener, 2);
     }
 
-    protected static void verifyNoNotifications(final MockEntityOwnershipListener listener, long delaySeconds) {
+    protected static void verifyNoNotifications(final MockEntityOwnershipListener listener, final long delaySeconds) {
         await().pollDelay(delaySeconds, TimeUnit.SECONDS).until(() -> listener.getChanges().isEmpty());
     }
 
     protected static void verifyNoAdditionalNotifications(
-            final MockEntityOwnershipListener listener, long delaySeconds) {
+            final MockEntityOwnershipListener listener, final long delaySeconds) {
         listener.resetListener();
         verifyNoNotifications(listener, delaySeconds);
     }
@@ -393,9 +414,9 @@ public abstract class AbstractNativeEosTest {
     }
 
     protected static final class MockNativeEntityOwnershipService extends AkkaEntityOwnershipService {
-        private ActorSystem classicActorSystem;
+        private final ActorSystem classicActorSystem;
 
-        protected MockNativeEntityOwnershipService(ActorSystem classicActorSystem)
+        protected MockNativeEntityOwnershipService(final ActorSystem classicActorSystem)
                 throws ExecutionException, InterruptedException {
             super(classicActorSystem, CODEC_CONTEXT);
             this.classicActorSystem = classicActorSystem;
index b707b84f2ec1cef04187c53c1c52d716af14bdc6..199e7931bb1e8a6234302a91ff2e1b23f0f3c970 100644 (file)
@@ -45,8 +45,14 @@ import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListenerRegistration
 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityName;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.NodeName;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.get.entities.output.EntitiesKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
@@ -221,6 +227,26 @@ public class AkkaEntityOwnershipServiceTest extends AbstractNativeEosTest {
 
         assertNull(getEntityResult.getResult().getOwnerNode());
         assertTrue(getEntityResult.getResult().getCandidateNodes().isEmpty());
+
+        final GetEntitiesOutput getEntitiesResult =
+                service.getEntities(new GetEntitiesInputBuilder().build()).get().getResult();
+
+        assertEquals(getEntitiesResult.getEntities().size(), 1);
+        assertTrue(getEntitiesResult.getEntities().get(new EntitiesKey(
+                new EntityName(CODEC_CONTEXT.fromYangInstanceIdentifier(entityId)), new EntityType(ENTITY_TYPE)))
+                .getCandidateNodes().contains(new NodeName("member-1")));
+        assertTrue(getEntitiesResult.getEntities().get(new EntitiesKey(
+                        new EntityName(CODEC_CONTEXT.fromYangInstanceIdentifier(entityId)),
+                        new EntityType(ENTITY_TYPE)))
+                .getOwnerNode().getValue().equals("member-1"));
+
+        final GetEntityOwnerOutput getOwnerResult = service.getEntityOwner(new GetEntityOwnerInputBuilder()
+                        .setName(new EntityName(CODEC_CONTEXT.fromYangInstanceIdentifier(entityId)))
+                        .setType(new EntityType(ENTITY_TYPE))
+                        .build())
+                .get().getResult();
+
+        assertEquals(getOwnerResult.getOwnerNode().getValue(), "member-1");
     }
 
     private static void verifyGetOwnershipState(final DOMEntityOwnershipService service, final DOMEntity entity,
diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/EntityRpcHandlerTest.java b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/EntityRpcHandlerTest.java
new file mode 100644 (file)
index 0000000..3e43be1
--- /dev/null
@@ -0,0 +1,160 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import akka.actor.ActorSystem;
+import akka.actor.testkit.typed.javadsl.ActorTestKit;
+import akka.actor.typed.javadsl.Adapter;
+import akka.cluster.Member;
+import akka.cluster.MemberStatus;
+import akka.cluster.typed.Cluster;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipCandidateRegistration;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityName;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.EntityType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.NodeName;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.get.entities.output.EntitiesKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+
+public class EntityRpcHandlerTest extends AbstractNativeEosTest {
+    static final String ENTITY_TYPE = "test";
+
+    private ActorSystem system1;
+    private ActorSystem system2;
+
+    private AkkaEntityOwnershipService service1;
+    private AkkaEntityOwnershipService service2;
+
+    @Before
+    public void setUp() throws Exception {
+        system1 = startupActorSystem(2550, List.of("member-1"), TWO_NODE_SEED_NODES);
+        system2 = startupActorSystem(2551, List.of("member-2"), TWO_NODE_SEED_NODES, "dc-backup");
+
+        service1 = new AkkaEntityOwnershipService(system1, CODEC_CONTEXT);
+        service2 = new AkkaEntityOwnershipService(system2, CODEC_CONTEXT);
+
+        // need to wait until all nodes are ready
+        final Cluster cluster = Cluster.get(Adapter.toTyped(system2));
+        Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> {
+            final List<Member> members = new ArrayList<>();
+            cluster.state().getMembers().forEach(members::add);
+            if (members.size() != 2) {
+                return false;
+            }
+
+            for (final Member member : members) {
+                if (!member.status().equals(MemberStatus.up())) {
+                    return false;
+                }
+            }
+
+            return true;
+        });
+    }
+
+    @After
+    public void tearDown() throws InterruptedException, ExecutionException {
+        service1.close();
+        service2.close();
+        ActorTestKit.shutdown(Adapter.toTyped(system1), Duration.ofSeconds(20));
+        ActorTestKit.shutdown(Adapter.toTyped(system2), Duration.ofSeconds(20));
+    }
+
+    /*
+     * Tests entity rpcs handled both by the owner supervisor(service1) and with an idle supervisor(falling
+     * back to distributed-data in an inactive datacenter). This covers both the available cases, datacenters and case
+     * in which the node with active akka-singleton is shutdown and another one takes over.
+     */
+    @Test
+    public void testEntityRetrievalWithUnavailableSupervisor() throws Exception {
+        final YangInstanceIdentifier entityId = YangInstanceIdentifier.create(new NodeIdentifier(NetworkTopology.QNAME),
+                new NodeIdentifier(Topology.QNAME),
+                NodeIdentifierWithPredicates.of(Topology.QNAME, QName.create(Topology.QNAME, "topology-id"), "test"),
+                new NodeIdentifier(Node.QNAME),
+                NodeIdentifierWithPredicates.of(Node.QNAME, QName.create(Node.QNAME, "node-id"), "test://test-node"));
+
+        final DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
+
+        final DOMEntityOwnershipCandidateRegistration reg = service1.registerCandidate(entity);
+
+        await().untilAsserted(() -> {
+            final RpcResult<GetEntityOutput> getEntityResult = service1.getEntity(new GetEntityInputBuilder()
+                            .setName(new EntityName(CODEC_CONTEXT.fromYangInstanceIdentifier(entityId)))
+                            .setType(new EntityType(ENTITY_TYPE))
+                            .build())
+                    .get();
+
+            assertEquals(getEntityResult.getResult().getOwnerNode().getValue(), "member-1");
+            assertEquals(getEntityResult.getResult().getCandidateNodes().get(0).getValue(), "member-1");
+        });
+
+        // keep this under ask timeout to make sure the singleton actor in the inactive datacenter responds with failure
+        // immediately, so that the rpc actor retries with distributed-data asap
+        await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> {
+            final GetEntitiesOutput getEntitiesResult =
+                    service2.getEntities(new GetEntitiesInputBuilder().build()).get().getResult();
+
+            assertEquals(getEntitiesResult.getEntities().size(), 1);
+            assertTrue(getEntitiesResult.getEntities().get(new EntitiesKey(
+                            new EntityName(CODEC_CONTEXT.fromYangInstanceIdentifier(entityId)),
+                            new EntityType(ENTITY_TYPE)))
+                    .getCandidateNodes().contains(new NodeName("member-1")));
+            assertTrue(getEntitiesResult.getEntities().get(new EntitiesKey(
+                            new EntityName(CODEC_CONTEXT.fromYangInstanceIdentifier(entityId)),
+                            new EntityType(ENTITY_TYPE)))
+                    .getOwnerNode().getValue().equals("member-1"));
+        });
+
+        await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> {
+            final GetEntityOutput getEntityResult = service2.getEntity(new GetEntityInputBuilder()
+                            .setName(new EntityName(CODEC_CONTEXT.fromYangInstanceIdentifier(entityId)))
+                            .setType(new EntityType(ENTITY_TYPE))
+                            .build())
+                    .get().getResult();
+
+            assertEquals(getEntityResult.getOwnerNode().getValue(), "member-1");
+            assertEquals(getEntityResult.getCandidateNodes().get(0).getValue(), "member-1");
+        });
+
+        await().atMost(Duration.ofSeconds(2)).untilAsserted(() -> {
+            final GetEntityOwnerOutput getOwnerResult = service2.getEntityOwner(new GetEntityOwnerInputBuilder()
+                            .setName(new EntityName(CODEC_CONTEXT.fromYangInstanceIdentifier(entityId)))
+                            .setType(new EntityType(ENTITY_TYPE))
+                            .build())
+                    .get().getResult();
+
+            assertEquals(getOwnerResult.getOwnerNode().getValue(), "member-1");
+        });
+
+    }
+}
index b396053e29815683ab15fda72a5fc26095800db5..4bf68a7ef1a46c0749ad618ba0a21054cc58f03e 100644 (file)
@@ -117,8 +117,8 @@ public class OwnerSupervisorTest extends AbstractNativeEosTest {
         private final Map<DOMEntity, String> currentOwners;
 
         private MockSyncer(final ActorContext<OwnerSupervisorCommand> context,
-                          final Map<DOMEntity, Set<String>> currentCandidates,
-                          final Map<DOMEntity, String> currentOwners) {
+                           final Map<DOMEntity, Set<String>> currentCandidates,
+                           final Map<DOMEntity, String> currentOwners) {
             super(context);
             this.currentCandidates = currentCandidates;
             this.currentOwners = currentOwners;
@@ -163,12 +163,14 @@ public class OwnerSupervisorTest extends AbstractNativeEosTest {
 
             listenerRegistry = context.spawn(EntityTypeListenerRegistry.create(role), "ListenerRegistry");
             candidateRegistry = context.spawn(CandidateRegistry.create(), "CandidateRegistry");
-            ownerStateChecker = context.spawn(OwnerStateChecker.create(role), "OwnerStateChecker");
 
             final ClusterSingleton clusterSingleton = ClusterSingleton.get(context.getSystem());
             // start the initial sync behavior that switches to the regular one after syncing
             ownerSupervisor = clusterSingleton.init(SingletonActor.of(
                     MockSyncer.create(currentCandidates, currentOwners), "OwnerSupervisor"));
+
+            ownerStateChecker = context.spawn(OwnerStateChecker.create(role, ownerSupervisor, null),
+                    "OwnerStateChecker");
         }
 
         public static Behavior<BootstrapCommand> create(final Map<DOMEntity, Set<String>> currentCandidates,