Bump versions 9.0.4-SNAPSHOT
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / AkkaEntityOwnershipService.java
index ce60f6f83d26e85aa5f173f60b24ad97e43014ed..332fb44af7575423869362d3132a064e4fd226a2 100644 (file)
@@ -17,6 +17,7 @@ import akka.cluster.typed.Cluster;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.Set;
@@ -33,19 +34,18 @@ import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
 import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
 import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
 import org.opendaylight.controller.eos.akka.bootstrap.command.Terminate;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntitiesRequest;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerReply;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerRequest;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityReply;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityRequest;
 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState;
 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply;
 import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
+import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerReply;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesReply;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesRequest;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerReply;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerRequest;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityReply;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityRequest;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
-import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
@@ -53,20 +53,22 @@ import org.opendaylight.controller.eos.akka.registry.listener.type.command.Regis
 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
 import org.opendaylight.controller.eos.akka.registry.listener.type.command.UnregisterListener;
 import org.opendaylight.mdsal.binding.api.RpcProviderService;
+import org.opendaylight.mdsal.binding.dom.codec.api.BindingCodecTree;
+import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
-import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipCandidateRegistration;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
-import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListenerRegistration;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntities;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntity;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwner;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.OdlEntityOwnersService;
 import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.binding.RpcOutput;
 import org.opendaylight.yangtools.yang.common.Empty;
@@ -86,8 +88,7 @@ import org.slf4j.LoggerFactory;
  */
 @Singleton
 @Component(immediate = true, service = { DOMEntityOwnershipService.class, DataCenterControl.class })
-public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, DataCenterControl, AutoCloseable,
-        OdlEntityOwnersService {
+public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, DataCenterControl, AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(AkkaEntityOwnershipService.class);
     private static final String DATACENTER_PREFIX = "dc";
     private static final Duration DATACENTER_OP_TIMEOUT = Duration.ofSeconds(20);
@@ -105,10 +106,12 @@ public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, Da
     private final ActorRef<StateCheckerCommand> ownerStateChecker;
     protected final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
 
+    private final BindingInstanceIdentifierCodec iidCodec;
+
     private Registration reg;
 
     @VisibleForTesting
-    protected AkkaEntityOwnershipService(final ActorSystem actorSystem)
+    protected AkkaEntityOwnershipService(final ActorSystem actorSystem, final BindingCodecTree codecTree)
             throws ExecutionException, InterruptedException {
         final var typedActorSystem = Adapter.toTyped(actorSystem);
         scheduler = typedActorSystem.scheduler();
@@ -121,7 +124,9 @@ public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, Da
             .findFirst()
             .orElseThrow(() -> new IllegalArgumentException("No valid role found."));
 
-        bootstrap = Adapter.spawn(actorSystem, Behaviors.setup(context -> EOSMain.create()), "EOSBootstrap");
+        iidCodec = codecTree.getInstanceIdentifierCodec();
+        bootstrap = Adapter.spawn(actorSystem, Behaviors.setup(
+                context -> EOSMain.create(iidCodec)), "EOSBootstrap");
 
         final CompletionStage<RunningContext> ask = AskPattern.ask(bootstrap,
                 GetRunningContext::new, Duration.ofSeconds(5), scheduler);
@@ -135,11 +140,17 @@ public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, Da
 
     @Inject
     @Activate
+    @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR",
+        justification = "Non-final for testing 'this' reference is expected to be stable at registration time")
     public AkkaEntityOwnershipService(@Reference final ActorSystemProvider actorProvider,
-            @Reference final RpcProviderService rpcProvider) throws ExecutionException, InterruptedException {
-        this(actorProvider.getActorSystem());
+            @Reference final RpcProviderService rpcProvider, @Reference final BindingCodecTree codecTree)
+            throws ExecutionException, InterruptedException {
+        this(actorProvider.getActorSystem(), codecTree);
 
-        reg = rpcProvider.registerRpcImplementation(OdlEntityOwnersService.class, this);
+        reg = rpcProvider.registerRpcImplementations(
+            (GetEntity) this::getEntity,
+            (GetEntities) this::getEntities,
+            (GetEntityOwner) this::getEntityOwner);
     }
 
     @PreDestroy
@@ -154,7 +165,7 @@ public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, Da
     }
 
     @Override
-    public DOMEntityOwnershipCandidateRegistration registerCandidate(final DOMEntity entity)
+    public Registration registerCandidate(final DOMEntity entity)
             throws CandidateAlreadyRegisteredException {
         if (!registeredEntities.add(entity)) {
             throw new CandidateAlreadyRegisteredException(entity);
@@ -168,8 +179,7 @@ public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, Da
     }
 
     @Override
-    public DOMEntityOwnershipListenerRegistration registerListener(final String entityType,
-                                                                   final DOMEntityOwnershipListener listener) {
+    public Registration registerListener(final String entityType, final DOMEntityOwnershipListener listener) {
         LOG.debug("Registering listener {} for type {}", listener, entityType);
         listenerRegistry.tell(new RegisterListener(entityType, listener));
 
@@ -215,23 +225,22 @@ public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, Da
             AskPattern.ask(ownerSupervisor, DeactivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler));
     }
 
-
-    @Override
-    public ListenableFuture<RpcResult<GetEntitiesOutput>> getEntities(final GetEntitiesInput input) {
-        return toRpcFuture(AskPattern.ask(ownerSupervisor, GetEntitiesRequest::new, QUERY_TIMEOUT, scheduler),
-            GetEntitiesReply::toOutput);
+    @VisibleForTesting
+    final ListenableFuture<RpcResult<GetEntitiesOutput>> getEntities(final GetEntitiesInput input) {
+        return toRpcFuture(AskPattern.ask(ownerStateChecker, GetEntitiesRequest::new, QUERY_TIMEOUT, scheduler),
+                reply -> reply.toOutput(iidCodec));
     }
 
-    @Override
-    public ListenableFuture<RpcResult<GetEntityOutput>> getEntity(final GetEntityInput input) {
-        return toRpcFuture(AskPattern.ask(ownerSupervisor,
+    @VisibleForTesting
+    final ListenableFuture<RpcResult<GetEntityOutput>> getEntity(final GetEntityInput input) {
+        return toRpcFuture(AskPattern.ask(ownerStateChecker,
             (final ActorRef<GetEntityReply> replyTo) -> new GetEntityRequest(replyTo, input), QUERY_TIMEOUT, scheduler),
             GetEntityReply::toOutput);
     }
 
-    @Override
-    public ListenableFuture<RpcResult<GetEntityOwnerOutput>> getEntityOwner(final GetEntityOwnerInput input) {
-        return toRpcFuture(AskPattern.ask(ownerSupervisor,
+    @VisibleForTesting
+    final ListenableFuture<RpcResult<GetEntityOwnerOutput>> getEntityOwner(final GetEntityOwnerInput input) {
+        return toRpcFuture(AskPattern.ask(ownerStateChecker,
             (final ActorRef<GetEntityOwnerReply> replyTo) -> new GetEntityOwnerRequest(replyTo, input), QUERY_TIMEOUT,
             scheduler), GetEntityOwnerReply::toOutput);
     }
@@ -255,7 +264,7 @@ public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, Da
         return runningContext;
     }
 
-    private static <R extends OwnerSupervisorReply, O extends RpcOutput> ListenableFuture<RpcResult<O>> toRpcFuture(
+    private static <R extends StateCheckerReply, O extends RpcOutput> ListenableFuture<RpcResult<O>> toRpcFuture(
             final CompletionStage<R> stage, final Function<R, O> outputFunction) {
 
         final SettableFuture<RpcResult<O>> future = SettableFuture.create();
@@ -277,7 +286,7 @@ public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, Da
                 future.setException(failure);
             } else {
                 LOG.debug("{} DataCenter successful", op);
-                future.set(Empty.getInstance());
+                future.set(Empty.value());
             }
         });
         return future;