import akka.actor.typed.javadsl.Behaviors;
import akka.cluster.typed.Cluster;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import java.time.Duration;
import java.util.Optional;
import java.util.Set;
import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState;
import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply;
import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
* the appropriate owners.
*/
@Singleton
-@Component(immediate = true, service = DOMEntityOwnershipService.class)
-public final class AkkaEntityOwnershipService implements DOMEntityOwnershipService, AutoCloseable {
+@Component(immediate = true, service = { DOMEntityOwnershipService.class, NativeEosService.class })
+public final class AkkaEntityOwnershipService implements DOMEntityOwnershipService, NativeEosService, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(AkkaEntityOwnershipService.class);
private static final String DATACENTER_PREFIX = "dc";
+ private static final Duration DATACENTER_OP_TIMEOUT = Duration.ofSeconds(20);
private final Set<DOMEntity> registeredEntities = ConcurrentHashMap.newKeySet();
private final String localCandidate;
private final Scheduler scheduler;
+ private final String datacenter;
private final ActorRef<BootstrapCommand> bootstrap;
private final RunningContext runningContext;
private final ActorRef<CandidateRegistryCommand> candidateRegistry;
private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
private final ActorRef<StateCheckerCommand> ownerStateChecker;
+ private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
@VisibleForTesting
AkkaEntityOwnershipService(final ActorSystem actorSystem) throws ExecutionException, InterruptedException {
final var typedActorSystem = Adapter.toTyped(actorSystem);
-
scheduler = typedActorSystem.scheduler();
- localCandidate = Cluster.get(typedActorSystem).selfMember().getRoles().stream()
+
+ final Cluster cluster = Cluster.get(typedActorSystem);
+ datacenter = cluster.selfMember().dataCenter();
+
+ localCandidate = cluster.selfMember().getRoles().stream()
.filter(role -> !role.contains(DATACENTER_PREFIX))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("No valid role found."));
candidateRegistry = runningContext.getCandidateRegistry();
listenerRegistry = runningContext.getListenerRegistry();
ownerStateChecker = runningContext.getOwnerStateChecker();
+ ownerSupervisor = runningContext.getOwnerSupervisor();
}
@Inject
return registeredEntities.contains(forEntity);
}
+ @Override
+ public ListenableFuture<Void> activateDataCenter() {
+ LOG.debug("Activating datacenter: {}", datacenter);
+ final SettableFuture<Void> future = SettableFuture.create();
+ final CompletionStage<OwnerSupervisorReply> ask =
+ AskPattern.ask(ownerSupervisor, ActivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler);
+
+ ask.whenComplete((reply, failure) -> {
+ if (failure != null) {
+ LOG.warn("Activate DataCenter has failed.", failure);
+ future.setException(failure);
+ return;
+ }
+
+ LOG.debug("Activate DataCenter successful.");
+ future.set(null);
+ });
+
+ return future;
+ }
+
+ @Override
+ public ListenableFuture<Void> deactivateDataCenter() {
+ LOG.debug("Deactivating datacenter: {}", datacenter);
+ final SettableFuture<Void> future = SettableFuture.create();
+ final CompletionStage<OwnerSupervisorReply> ask =
+ AskPattern.ask(ownerSupervisor, DeactivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler);
+
+ ask.whenComplete((reply, failure) -> {
+ if (failure != null) {
+ LOG.warn("Deactivate DataCenter has failed.", failure);
+ future.setException(failure);
+ return;
+ }
+
+ LOG.debug("Deactivate DataCenter successful.");
+ future.set(null);
+ });
+
+ return future;
+ }
+
void unregisterCandidate(final DOMEntity entity) {
LOG.debug("Unregistering candidate for {}", entity);