import akka.actor.typed.javadsl.Behaviors;
import akka.cluster.typed.Cluster;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.time.Duration;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
import org.opendaylight.controller.eos.akka.bootstrap.command.Terminate;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntitiesRequest;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerReply;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerRequest;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityReply;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityRequest;
import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState;
import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply;
import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
+import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
import org.opendaylight.controller.eos.akka.registry.listener.type.command.UnregisterListener;
+import org.opendaylight.mdsal.binding.api.RpcProviderService;
+import org.opendaylight.mdsal.binding.dom.codec.api.BindingCodecTree;
+import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListenerRegistration;
import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntitiesOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.GetEntityOwnerOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.entity.owners.norev.OdlEntityOwnersService;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.binding.RpcOutput;
+import org.opendaylight.yangtools.yang.common.Empty;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
* the appropriate owners.
*/
@Singleton
-@Component(immediate = true, service = DOMEntityOwnershipService.class)
-public final class AkkaEntityOwnershipService implements DOMEntityOwnershipService, AutoCloseable {
+@Component(immediate = true, service = { DOMEntityOwnershipService.class, DataCenterControl.class })
+public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, DataCenterControl, AutoCloseable,
+ OdlEntityOwnersService {
private static final Logger LOG = LoggerFactory.getLogger(AkkaEntityOwnershipService.class);
private static final String DATACENTER_PREFIX = "dc";
+ private static final Duration DATACENTER_OP_TIMEOUT = Duration.ofSeconds(20);
+ private static final Duration QUERY_TIMEOUT = Duration.ofSeconds(10);
private final Set<DOMEntity> registeredEntities = ConcurrentHashMap.newKeySet();
private final String localCandidate;
private final Scheduler scheduler;
+ private final String datacenter;
private final ActorRef<BootstrapCommand> bootstrap;
private final RunningContext runningContext;
private final ActorRef<CandidateRegistryCommand> candidateRegistry;
private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
private final ActorRef<StateCheckerCommand> ownerStateChecker;
+ protected final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
+
+ private final BindingInstanceIdentifierCodec iidCodec;
+
+ private Registration reg;
@VisibleForTesting
- AkkaEntityOwnershipService(final ActorSystem actorSystem) throws ExecutionException, InterruptedException {
+ protected AkkaEntityOwnershipService(final ActorSystem actorSystem, final BindingCodecTree codecTree)
+ throws ExecutionException, InterruptedException {
final var typedActorSystem = Adapter.toTyped(actorSystem);
-
scheduler = typedActorSystem.scheduler();
- localCandidate = Cluster.get(typedActorSystem).selfMember().getRoles().stream()
+
+ final Cluster cluster = Cluster.get(typedActorSystem);
+ datacenter = cluster.selfMember().dataCenter();
+
+ localCandidate = cluster.selfMember().getRoles().stream()
.filter(role -> !role.contains(DATACENTER_PREFIX))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("No valid role found."));
- bootstrap = Adapter.spawn(actorSystem, Behaviors.setup(context -> EOSMain.create()), "EOSBootstrap");
+ iidCodec = codecTree.getInstanceIdentifierCodec();
+ bootstrap = Adapter.spawn(actorSystem, Behaviors.setup(
+ context -> EOSMain.create(iidCodec)), "EOSBootstrap");
final CompletionStage<RunningContext> ask = AskPattern.ask(bootstrap,
GetRunningContext::new, Duration.ofSeconds(5), scheduler);
candidateRegistry = runningContext.getCandidateRegistry();
listenerRegistry = runningContext.getListenerRegistry();
ownerStateChecker = runningContext.getOwnerStateChecker();
+ ownerSupervisor = runningContext.getOwnerSupervisor();
}
@Inject
@Activate
- public AkkaEntityOwnershipService(@Reference final ActorSystemProvider provider)
+ @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR",
+ justification = "Non-final for testing 'this' reference is expected to be stable at registration time")
+ public AkkaEntityOwnershipService(@Reference final ActorSystemProvider actorProvider,
+ @Reference final RpcProviderService rpcProvider, @Reference final BindingCodecTree codecTree)
throws ExecutionException, InterruptedException {
- this(provider.getActorSystem());
+ this(actorProvider.getActorSystem(), codecTree);
+
+ reg = rpcProvider.registerRpcImplementation(OdlEntityOwnersService.class, this);
}
@PreDestroy
@Deactivate
@Override
public void close() throws InterruptedException, ExecutionException {
+ if (reg != null) {
+ reg.close();
+ reg = null;
+ }
AskPattern.ask(bootstrap, Terminate::new, Duration.ofSeconds(5), scheduler).toCompletableFuture().get();
}
return registeredEntities.contains(forEntity);
}
+ @Override
+ public ListenableFuture<Empty> activateDataCenter() {
+ LOG.debug("Activating datacenter: {}", datacenter);
+
+ return toListenableFuture("Activate",
+ AskPattern.ask(ownerSupervisor, ActivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler));
+ }
+
+ @Override
+ public ListenableFuture<Empty> deactivateDataCenter() {
+ LOG.debug("Deactivating datacenter: {}", datacenter);
+ return toListenableFuture("Deactivate",
+ AskPattern.ask(ownerSupervisor, DeactivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler));
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<GetEntitiesOutput>> getEntities(final GetEntitiesInput input) {
+ return toRpcFuture(AskPattern.ask(ownerStateChecker, GetEntitiesRequest::new, QUERY_TIMEOUT, scheduler),
+ reply -> reply.toOutput(iidCodec));
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<GetEntityOutput>> getEntity(final GetEntityInput input) {
+ return toRpcFuture(AskPattern.ask(ownerStateChecker,
+ (final ActorRef<GetEntityReply> replyTo) -> new GetEntityRequest(replyTo, input), QUERY_TIMEOUT, scheduler),
+ GetEntityReply::toOutput);
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<GetEntityOwnerOutput>> getEntityOwner(final GetEntityOwnerInput input) {
+ return toRpcFuture(AskPattern.ask(ownerStateChecker,
+ (final ActorRef<GetEntityOwnerReply> replyTo) -> new GetEntityOwnerRequest(replyTo, input), QUERY_TIMEOUT,
+ scheduler), GetEntityOwnerReply::toOutput);
+ }
+
void unregisterCandidate(final DOMEntity entity) {
LOG.debug("Unregistering candidate for {}", entity);
RunningContext getRunningContext() {
return runningContext;
}
+
+ private static <R extends StateCheckerReply, O extends RpcOutput> ListenableFuture<RpcResult<O>> toRpcFuture(
+ final CompletionStage<R> stage, final Function<R, O> outputFunction) {
+
+ final SettableFuture<RpcResult<O>> future = SettableFuture.create();
+ stage.whenComplete((reply, failure) -> {
+ if (failure != null) {
+ future.setException(failure);
+ } else {
+ future.set(RpcResultBuilder.success(outputFunction.apply(reply)).build());
+ }
+ });
+ return future;
+ }
+
+ private static ListenableFuture<Empty> toListenableFuture(final String op, final CompletionStage<?> stage) {
+ final SettableFuture<Empty> future = SettableFuture.create();
+ stage.whenComplete((reply, failure) -> {
+ if (failure != null) {
+ LOG.warn("{} DataCenter failed", op, failure);
+ future.setException(failure);
+ } else {
+ LOG.debug("{} DataCenter successful", op);
+ future.set(Empty.value());
+ }
+ });
+ return future;
+ }
}