Rename NativeEosService to DataCenterControl
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / AkkaEntityOwnershipService.java
index 27fbad447413b0dfd70c2cbb6fc186ce26b6cbf7..fdb8a4d02f730224fb8668c104085bf6571cc652 100644 (file)
@@ -15,6 +15,8 @@ import akka.actor.typed.javadsl.AskPattern;
 import akka.actor.typed.javadsl.Behaviors;
 import akka.cluster.typed.Cluster;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.Set;
@@ -33,6 +35,9 @@ import org.opendaylight.controller.eos.akka.bootstrap.command.Terminate;
 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState;
 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply;
 import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
@@ -46,6 +51,7 @@ import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipCandidateRegistratio
 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListenerRegistration;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
+import org.opendaylight.yangtools.yang.common.Empty;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -59,27 +65,34 @@ import org.slf4j.LoggerFactory;
  * the appropriate owners.
  */
 @Singleton
-@Component(immediate = true, service = DOMEntityOwnershipService.class)
-public final class AkkaEntityOwnershipService implements DOMEntityOwnershipService, AutoCloseable {
+@Component(immediate = true, service = { DOMEntityOwnershipService.class, DataCenterControl.class })
+public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, DataCenterControl, AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(AkkaEntityOwnershipService.class);
     private static final String DATACENTER_PREFIX = "dc";
+    private static final Duration DATACENTER_OP_TIMEOUT = Duration.ofSeconds(20);
 
     private final Set<DOMEntity> registeredEntities = ConcurrentHashMap.newKeySet();
     private final String localCandidate;
     private final Scheduler scheduler;
+    private final String datacenter;
 
     private final ActorRef<BootstrapCommand> bootstrap;
     private final RunningContext runningContext;
     private final ActorRef<CandidateRegistryCommand> candidateRegistry;
     private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
     private final ActorRef<StateCheckerCommand> ownerStateChecker;
+    protected final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
 
     @VisibleForTesting
-    AkkaEntityOwnershipService(final ActorSystem actorSystem) throws ExecutionException, InterruptedException {
+    protected AkkaEntityOwnershipService(final ActorSystem actorSystem)
+            throws ExecutionException, InterruptedException {
         final var typedActorSystem = Adapter.toTyped(actorSystem);
-
         scheduler = typedActorSystem.scheduler();
-        localCandidate = Cluster.get(typedActorSystem).selfMember().getRoles().stream()
+
+        final Cluster cluster = Cluster.get(typedActorSystem);
+        datacenter = cluster.selfMember().dataCenter();
+
+        localCandidate = cluster.selfMember().getRoles().stream()
             .filter(role -> !role.contains(DATACENTER_PREFIX))
             .findFirst()
             .orElseThrow(() -> new IllegalArgumentException("No valid role found."));
@@ -93,6 +106,7 @@ public final class AkkaEntityOwnershipService implements DOMEntityOwnershipServi
         candidateRegistry = runningContext.getCandidateRegistry();
         listenerRegistry = runningContext.getListenerRegistry();
         ownerStateChecker = runningContext.getOwnerStateChecker();
+        ownerSupervisor = runningContext.getOwnerSupervisor();
     }
 
     @Inject
@@ -156,6 +170,21 @@ public final class AkkaEntityOwnershipService implements DOMEntityOwnershipServi
         return registeredEntities.contains(forEntity);
     }
 
+    @Override
+    public ListenableFuture<Empty> activateDataCenter() {
+        LOG.debug("Activating datacenter: {}", datacenter);
+
+        return toListenableFuture("Activate",
+            AskPattern.ask(ownerSupervisor, ActivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler));
+    }
+
+    @Override
+    public ListenableFuture<Empty> deactivateDataCenter() {
+        LOG.debug("Deactivating datacenter: {}", datacenter);
+        return toListenableFuture("Deactivate",
+            AskPattern.ask(ownerSupervisor, DeactivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler));
+    }
+
     void unregisterCandidate(final DOMEntity entity) {
         LOG.debug("Unregistering candidate for {}", entity);
 
@@ -174,4 +203,18 @@ public final class AkkaEntityOwnershipService implements DOMEntityOwnershipServi
     RunningContext getRunningContext() {
         return runningContext;
     }
+
+    private static ListenableFuture<Empty> toListenableFuture(final String op, final CompletionStage<?> stage) {
+        final SettableFuture<Empty> future = SettableFuture.create();
+        stage.whenComplete((reply, failure) -> {
+            if (failure != null) {
+                LOG.warn("{} DataCenter has failed", op, failure);
+                future.setException(failure);
+            } else {
+                LOG.debug("{} DataCenter successful", op);
+                future.set(Empty.getInstance());
+            }
+        });
+        return future;
+    }
 }