Fix eos entity lookups with YangInstanceIdentifier
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / AkkaEntityOwnershipService.java
index 27fbad447413b0dfd70c2cbb6fc186ce26b6cbf7..e815c8a6a0cc886661c2b38e80106b0fd4743150 100644 (file)
@@ -15,12 +15,15 @@ import akka.actor.typed.javadsl.AskPattern;
 import akka.actor.typed.javadsl.Behaviors;
 import akka.cluster.typed.Cluster;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 import javax.inject.Singleton;
@@ -33,12 +36,24 @@ import org.opendaylight.controller.eos.akka.bootstrap.command.Terminate;
 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState;
 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply;
 import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
 import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
 import org.opendaylight.controller.eos.akka.registry.listener.type.command.UnregisterListener;
+import org.opendaylight.mdsal.binding.api.RpcProviderService;
+import org.opendaylight.mdsal.binding.dom.codec.api.BindingCodecTree;
+import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
@@ -46,6 +61,18 @@ import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipCandidateRegistratio
 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListenerRegistration;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.OdlEntityOwnersService;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.binding.RpcOutput;
+import org.opendaylight.yangtools.yang.common.Empty;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -59,32 +86,47 @@ import org.slf4j.LoggerFactory;
  * the appropriate owners.
  */
 @Singleton
-@Component(immediate = true, service = DOMEntityOwnershipService.class)
-public final class AkkaEntityOwnershipService implements DOMEntityOwnershipService, AutoCloseable {
+@Component(immediate = true, service = { DOMEntityOwnershipService.class, DataCenterControl.class })
+public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, DataCenterControl, AutoCloseable,
+        OdlEntityOwnersService {
     private static final Logger LOG = LoggerFactory.getLogger(AkkaEntityOwnershipService.class);
     private static final String DATACENTER_PREFIX = "dc";
+    private static final Duration DATACENTER_OP_TIMEOUT = Duration.ofSeconds(20);
+    private static final Duration QUERY_TIMEOUT = Duration.ofSeconds(10);
 
     private final Set<DOMEntity> registeredEntities = ConcurrentHashMap.newKeySet();
     private final String localCandidate;
     private final Scheduler scheduler;
+    private final String datacenter;
 
     private final ActorRef<BootstrapCommand> bootstrap;
     private final RunningContext runningContext;
     private final ActorRef<CandidateRegistryCommand> candidateRegistry;
     private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
     private final ActorRef<StateCheckerCommand> ownerStateChecker;
+    protected final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
+
+    private final BindingInstanceIdentifierCodec iidCodec;
+
+    private Registration reg;
 
     @VisibleForTesting
-    AkkaEntityOwnershipService(final ActorSystem actorSystem) throws ExecutionException, InterruptedException {
+    protected AkkaEntityOwnershipService(final ActorSystem actorSystem, final BindingCodecTree codecTree)
+            throws ExecutionException, InterruptedException {
         final var typedActorSystem = Adapter.toTyped(actorSystem);
-
         scheduler = typedActorSystem.scheduler();
-        localCandidate = Cluster.get(typedActorSystem).selfMember().getRoles().stream()
+
+        final Cluster cluster = Cluster.get(typedActorSystem);
+        datacenter = cluster.selfMember().dataCenter();
+
+        localCandidate = cluster.selfMember().getRoles().stream()
             .filter(role -> !role.contains(DATACENTER_PREFIX))
             .findFirst()
             .orElseThrow(() -> new IllegalArgumentException("No valid role found."));
 
-        bootstrap = Adapter.spawn(actorSystem, Behaviors.setup(context -> EOSMain.create()), "EOSBootstrap");
+        iidCodec = codecTree.getInstanceIdentifierCodec();
+        bootstrap = Adapter.spawn(actorSystem, Behaviors.setup(
+                context -> EOSMain.create(iidCodec)), "EOSBootstrap");
 
         final CompletionStage<RunningContext> ask = AskPattern.ask(bootstrap,
                 GetRunningContext::new, Duration.ofSeconds(5), scheduler);
@@ -93,19 +135,27 @@ public final class AkkaEntityOwnershipService implements DOMEntityOwnershipServi
         candidateRegistry = runningContext.getCandidateRegistry();
         listenerRegistry = runningContext.getListenerRegistry();
         ownerStateChecker = runningContext.getOwnerStateChecker();
+        ownerSupervisor = runningContext.getOwnerSupervisor();
     }
 
     @Inject
     @Activate
-    public AkkaEntityOwnershipService(@Reference final ActorSystemProvider provider)
+    public AkkaEntityOwnershipService(@Reference final ActorSystemProvider actorProvider,
+            @Reference final RpcProviderService rpcProvider, @Reference final BindingCodecTree codecTree)
             throws ExecutionException, InterruptedException {
-        this(provider.getActorSystem());
+        this(actorProvider.getActorSystem(), codecTree);
+
+        reg = rpcProvider.registerRpcImplementation(OdlEntityOwnersService.class, this);
     }
 
     @PreDestroy
     @Deactivate
     @Override
     public void close() throws InterruptedException, ExecutionException {
+        if (reg != null) {
+            reg.close();
+            reg = null;
+        }
         AskPattern.ask(bootstrap, Terminate::new, Duration.ofSeconds(5), scheduler).toCompletableFuture().get();
     }
 
@@ -156,6 +206,42 @@ public final class AkkaEntityOwnershipService implements DOMEntityOwnershipServi
         return registeredEntities.contains(forEntity);
     }
 
+    @Override
+    public ListenableFuture<Empty> activateDataCenter() {
+        LOG.debug("Activating datacenter: {}", datacenter);
+
+        return toListenableFuture("Activate",
+            AskPattern.ask(ownerSupervisor, ActivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler));
+    }
+
+    @Override
+    public ListenableFuture<Empty> deactivateDataCenter() {
+        LOG.debug("Deactivating datacenter: {}", datacenter);
+        return toListenableFuture("Deactivate",
+            AskPattern.ask(ownerSupervisor, DeactivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler));
+    }
+
+
+    @Override
+    public ListenableFuture<RpcResult<GetEntitiesOutput>> getEntities(final GetEntitiesInput input) {
+        return toRpcFuture(AskPattern.ask(ownerSupervisor, GetEntitiesRequest::new, QUERY_TIMEOUT, scheduler),
+                reply -> reply.toOutput(iidCodec));
+    }
+
+    @Override
+    public ListenableFuture<RpcResult<GetEntityOutput>> getEntity(final GetEntityInput input) {
+        return toRpcFuture(AskPattern.ask(ownerSupervisor,
+            (final ActorRef<GetEntityReply> replyTo) -> new GetEntityRequest(replyTo, input), QUERY_TIMEOUT, scheduler),
+            GetEntityReply::toOutput);
+    }
+
+    @Override
+    public ListenableFuture<RpcResult<GetEntityOwnerOutput>> getEntityOwner(final GetEntityOwnerInput input) {
+        return toRpcFuture(AskPattern.ask(ownerSupervisor,
+            (final ActorRef<GetEntityOwnerReply> replyTo) -> new GetEntityOwnerRequest(replyTo, input), QUERY_TIMEOUT,
+            scheduler), GetEntityOwnerReply::toOutput);
+    }
+
     void unregisterCandidate(final DOMEntity entity) {
         LOG.debug("Unregistering candidate for {}", entity);
 
@@ -174,4 +260,32 @@ public final class AkkaEntityOwnershipService implements DOMEntityOwnershipServi
     RunningContext getRunningContext() {
         return runningContext;
     }
+
+    private static <R extends OwnerSupervisorReply, O extends RpcOutput> ListenableFuture<RpcResult<O>> toRpcFuture(
+            final CompletionStage<R> stage, final Function<R, O> outputFunction) {
+
+        final SettableFuture<RpcResult<O>> future = SettableFuture.create();
+        stage.whenComplete((reply, failure) -> {
+            if (failure != null) {
+                future.setException(failure);
+            } else {
+                future.set(RpcResultBuilder.success(outputFunction.apply(reply)).build());
+            }
+        });
+        return future;
+    }
+
+    private static ListenableFuture<Empty> toListenableFuture(final String op, final CompletionStage<?> stage) {
+        final SettableFuture<Empty> future = SettableFuture.create();
+        stage.whenComplete((reply, failure) -> {
+            if (failure != null) {
+                LOG.warn("{} DataCenter failed", op, failure);
+                future.setException(failure);
+            } else {
+                LOG.debug("{} DataCenter successful", op);
+                future.set(Empty.getInstance());
+            }
+        });
+        return future;
+    }
 }