X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Feos-dom-akka%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Feos%2Fakka%2FAkkaEntityOwnershipService.java;fp=opendaylight%2Fmd-sal%2Feos-dom-akka%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Feos%2Fakka%2FAkkaEntityOwnershipService.java;h=ce60f6f83d26e85aa5f173f60b24ad97e43014ed;hp=fdb8a4d02f730224fb8668c104085bf6571cc652;hb=fd7bc3ff484b7b0bd72d425d59883f710e397c3f;hpb=5b97f0fb5068bb3eb9b77b4509164707c5f35cd6 diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipService.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipService.java index fdb8a4d02f..ce60f6f83d 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipService.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipService.java @@ -23,6 +23,7 @@ import java.util.Set; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.function.Function; import javax.annotation.PreDestroy; import javax.inject.Inject; import javax.inject.Singleton; @@ -37,13 +38,21 @@ import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipSt import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand; import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter; import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesReply; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesRequest; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerReply; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerRequest; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityReply; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityRequest; import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply; import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand; import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate; import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate; import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener; import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand; import org.opendaylight.controller.eos.akka.registry.listener.type.command.UnregisterListener; +import org.opendaylight.mdsal.binding.api.RpcProviderService; import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException; import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState; import org.opendaylight.mdsal.eos.dom.api.DOMEntity; @@ -51,7 +60,18 @@ import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipCandidateRegistratio import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener; import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListenerRegistration; import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.OdlEntityOwnersService; +import org.opendaylight.yangtools.concepts.Registration; +import org.opendaylight.yangtools.yang.binding.RpcOutput; import org.opendaylight.yangtools.yang.common.Empty; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Deactivate; @@ -66,10 +86,12 @@ import org.slf4j.LoggerFactory; */ @Singleton @Component(immediate = true, service = { DOMEntityOwnershipService.class, DataCenterControl.class }) -public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, DataCenterControl, AutoCloseable { +public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, DataCenterControl, AutoCloseable, + OdlEntityOwnersService { private static final Logger LOG = LoggerFactory.getLogger(AkkaEntityOwnershipService.class); private static final String DATACENTER_PREFIX = "dc"; private static final Duration DATACENTER_OP_TIMEOUT = Duration.ofSeconds(20); + private static final Duration QUERY_TIMEOUT = Duration.ofSeconds(10); private final Set registeredEntities = ConcurrentHashMap.newKeySet(); private final String localCandidate; @@ -83,6 +105,8 @@ public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, Da private final ActorRef ownerStateChecker; protected final ActorRef ownerSupervisor; + private Registration reg; + @VisibleForTesting protected AkkaEntityOwnershipService(final ActorSystem actorSystem) throws ExecutionException, InterruptedException { @@ -111,15 +135,21 @@ public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, Da @Inject @Activate - public AkkaEntityOwnershipService(@Reference final ActorSystemProvider provider) - throws ExecutionException, InterruptedException { - this(provider.getActorSystem()); + public AkkaEntityOwnershipService(@Reference final ActorSystemProvider actorProvider, + @Reference final RpcProviderService rpcProvider) throws ExecutionException, InterruptedException { + this(actorProvider.getActorSystem()); + + reg = rpcProvider.registerRpcImplementation(OdlEntityOwnersService.class, this); } @PreDestroy @Deactivate @Override public void close() throws InterruptedException, ExecutionException { + if (reg != null) { + reg.close(); + reg = null; + } AskPattern.ask(bootstrap, Terminate::new, Duration.ofSeconds(5), scheduler).toCompletableFuture().get(); } @@ -185,6 +215,27 @@ public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, Da AskPattern.ask(ownerSupervisor, DeactivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler)); } + + @Override + public ListenableFuture> getEntities(final GetEntitiesInput input) { + return toRpcFuture(AskPattern.ask(ownerSupervisor, GetEntitiesRequest::new, QUERY_TIMEOUT, scheduler), + GetEntitiesReply::toOutput); + } + + @Override + public ListenableFuture> getEntity(final GetEntityInput input) { + return toRpcFuture(AskPattern.ask(ownerSupervisor, + (final ActorRef replyTo) -> new GetEntityRequest(replyTo, input), QUERY_TIMEOUT, scheduler), + GetEntityReply::toOutput); + } + + @Override + public ListenableFuture> getEntityOwner(final GetEntityOwnerInput input) { + return toRpcFuture(AskPattern.ask(ownerSupervisor, + (final ActorRef replyTo) -> new GetEntityOwnerRequest(replyTo, input), QUERY_TIMEOUT, + scheduler), GetEntityOwnerReply::toOutput); + } + void unregisterCandidate(final DOMEntity entity) { LOG.debug("Unregistering candidate for {}", entity); @@ -204,11 +255,25 @@ public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, Da return runningContext; } + private static ListenableFuture> toRpcFuture( + final CompletionStage stage, final Function outputFunction) { + + final SettableFuture> future = SettableFuture.create(); + stage.whenComplete((reply, failure) -> { + if (failure != null) { + future.setException(failure); + } else { + future.set(RpcResultBuilder.success(outputFunction.apply(reply)).build()); + } + }); + return future; + } + private static ListenableFuture toListenableFuture(final String op, final CompletionStage stage) { final SettableFuture future = SettableFuture.create(); stage.whenComplete((reply, failure) -> { if (failure != null) { - LOG.warn("{} DataCenter has failed", op, failure); + LOG.warn("{} DataCenter failed", op, failure); future.setException(failure); } else { LOG.debug("{} DataCenter successful", op);