Add cluster-admin api for datacenter activation 81/95981/14
authorTomas Cere <tomas.cere@pantheon.tech>
Mon, 3 May 2021 11:26:08 +0000 (13:26 +0200)
committerRobert Varga <nite@hq.sk>
Wed, 30 Jun 2021 08:17:38 +0000 (08:17 +0000)
Add rpcs into cluster-admin so we can activate/deactivate
datacenters for active/backup scenariop.

JIRA: CONTROLLER-1982
Change-Id: Ic68652199d79251fe9b166e47ca06520121213e4
Signed-off-by: Tomas Cere <tomas.cere@pantheon.tech>
18 files changed:
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipService.java
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/NativeEosService.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/EOSMain.java
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/IdleSupervisor.java
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSyncer.java
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/ActivateDataCenter.java
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/DataCenterActivated.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/DataCenterDeactivated.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/DeactivateDataCenter.java
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/OwnerSupervisorReply.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java
opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/DataCentersTest.java
opendaylight/md-sal/sal-cluster-admin-api/src/main/yang/cluster-admin.yang
opendaylight/md-sal/sal-cluster-admin-impl/pom.xml
opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java
opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/OSGiClusterAdmin.java
opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java

index 27fbad4..28ada56 100644 (file)
@@ -15,6 +15,8 @@ import akka.actor.typed.javadsl.AskPattern;
 import akka.actor.typed.javadsl.Behaviors;
 import akka.cluster.typed.Cluster;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.Set;
@@ -33,6 +35,10 @@ import org.opendaylight.controller.eos.akka.bootstrap.command.Terminate;
 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState;
 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply;
 import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
@@ -59,27 +65,33 @@ import org.slf4j.LoggerFactory;
  * the appropriate owners.
  */
 @Singleton
-@Component(immediate = true, service = DOMEntityOwnershipService.class)
-public final class AkkaEntityOwnershipService implements DOMEntityOwnershipService, AutoCloseable {
+@Component(immediate = true, service = { DOMEntityOwnershipService.class, NativeEosService.class })
+public final class AkkaEntityOwnershipService implements DOMEntityOwnershipService, NativeEosService, AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(AkkaEntityOwnershipService.class);
     private static final String DATACENTER_PREFIX = "dc";
+    private static final Duration DATACENTER_OP_TIMEOUT = Duration.ofSeconds(20);
 
     private final Set<DOMEntity> registeredEntities = ConcurrentHashMap.newKeySet();
     private final String localCandidate;
     private final Scheduler scheduler;
+    private final String datacenter;
 
     private final ActorRef<BootstrapCommand> bootstrap;
     private final RunningContext runningContext;
     private final ActorRef<CandidateRegistryCommand> candidateRegistry;
     private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
     private final ActorRef<StateCheckerCommand> ownerStateChecker;
+    private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
 
     @VisibleForTesting
     AkkaEntityOwnershipService(final ActorSystem actorSystem) throws ExecutionException, InterruptedException {
         final var typedActorSystem = Adapter.toTyped(actorSystem);
-
         scheduler = typedActorSystem.scheduler();
-        localCandidate = Cluster.get(typedActorSystem).selfMember().getRoles().stream()
+
+        final Cluster cluster = Cluster.get(typedActorSystem);
+        datacenter = cluster.selfMember().dataCenter();
+
+        localCandidate = cluster.selfMember().getRoles().stream()
             .filter(role -> !role.contains(DATACENTER_PREFIX))
             .findFirst()
             .orElseThrow(() -> new IllegalArgumentException("No valid role found."));
@@ -93,6 +105,7 @@ public final class AkkaEntityOwnershipService implements DOMEntityOwnershipServi
         candidateRegistry = runningContext.getCandidateRegistry();
         listenerRegistry = runningContext.getListenerRegistry();
         ownerStateChecker = runningContext.getOwnerStateChecker();
+        ownerSupervisor = runningContext.getOwnerSupervisor();
     }
 
     @Inject
@@ -156,6 +169,48 @@ public final class AkkaEntityOwnershipService implements DOMEntityOwnershipServi
         return registeredEntities.contains(forEntity);
     }
 
+    @Override
+    public ListenableFuture<Void> activateDataCenter() {
+        LOG.debug("Activating datacenter: {}", datacenter);
+        final SettableFuture<Void> future = SettableFuture.create();
+        final CompletionStage<OwnerSupervisorReply> ask =
+                AskPattern.ask(ownerSupervisor, ActivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler);
+
+        ask.whenComplete((reply, failure) -> {
+            if (failure != null) {
+                LOG.warn("Activate DataCenter has failed.", failure);
+                future.setException(failure);
+                return;
+            }
+
+            LOG.debug("Activate DataCenter successful.");
+            future.set(null);
+        });
+
+        return future;
+    }
+
+    @Override
+    public ListenableFuture<Void> deactivateDataCenter() {
+        LOG.debug("Deactivating datacenter: {}", datacenter);
+        final SettableFuture<Void> future = SettableFuture.create();
+        final CompletionStage<OwnerSupervisorReply> ask =
+                AskPattern.ask(ownerSupervisor, DeactivateDataCenter::new, DATACENTER_OP_TIMEOUT, scheduler);
+
+        ask.whenComplete((reply, failure) -> {
+            if (failure != null) {
+                LOG.warn("Deactivate DataCenter has failed.", failure);
+                future.setException(failure);
+                return;
+            }
+
+            LOG.debug("Deactivate DataCenter successful.");
+            future.set(null);
+        });
+
+        return future;
+    }
+
     void unregisterCandidate(final DOMEntity entity) {
         LOG.debug("Unregistering candidate for {}", entity);
 
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/NativeEosService.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/NativeEosService.java
new file mode 100644 (file)
index 0000000..6445eba
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka;
+
+import com.google.common.annotations.Beta;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * Service used to bring up/down the NativeEos service in individual datacenters.
+ * Active datacenter in native eos terms means that the candidates from this datacenter can become owners of entities.
+ * Additionally the singleton component makings ownership decisions, runs only in an active datacenter.
+ *
+ * <p>
+ * Caller must make sure that only one datacenter is active at a time, otherwise the singleton actors
+ * in each datacenter will interfere with each other. The methods provided byt this service can be called
+ * on any node from the datacenter to be activated. Datacenters only need to brought up when using non-default
+ * datacenter or multiple datacenters.
+ */
+@Beta
+public interface NativeEosService {
+
+    /**
+     * Activates the native eos service in the datacenter that this method is called.
+     */
+    ListenableFuture<Void> activateDataCenter();
+
+    /**
+     * Deactivates the native eos service in the datacenter that this method is called.
+     */
+    ListenableFuture<Void> deactivateDataCenter();
+}
index 5199d22..9e646fa 100644 (file)
@@ -72,4 +72,4 @@ public final class EOSMain extends AbstractBehavior<BootstrapCommand> {
         request.getReplyTo().tell(Empty.getInstance());
         return Behaviors.stopped();
     }
-}
+}
\ No newline at end of file
index 9cbea5c..1abf6d6 100644 (file)
@@ -37,7 +37,7 @@ public final class IdleSupervisor extends AbstractBehavior<OwnerSupervisorComman
         final String datacenterRole = extractDatacenterRole(cluster.selfMember());
         if (datacenterRole.equals(DEFAULT_DATACENTER)) {
             LOG.debug("No datacenter configured, activating default data center");
-            context.getSelf().tell(ActivateDataCenter.INSTANCE);
+            context.getSelf().tell(new ActivateDataCenter(null));
         }
 
         LOG.debug("Idle supervisor started on {}.", cluster.selfMember());
@@ -57,7 +57,7 @@ public final class IdleSupervisor extends AbstractBehavior<OwnerSupervisorComman
 
     private Behavior<OwnerSupervisorCommand> onActivateDataCenter(final ActivateDataCenter message) {
         LOG.debug("Received ActivateDataCenter command switching to syncer behavior,");
-        return OwnerSyncer.create();
+        return OwnerSyncer.create(message.getReplyTo());
     }
 
     private String extractDatacenterRole(final Member selfMember) {
index c012afe..91b0634 100644 (file)
@@ -40,6 +40,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterDeactivated;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
@@ -154,6 +155,7 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
 
     private Behavior<OwnerSupervisorCommand> onDeactivateDatacenter(final DeactivateDataCenter command) {
         LOG.debug("Deactivating Owner Supervisor on {}", cluster.selfMember());
+        command.getReplyTo().tell(DataCenterDeactivated.INSTANCE);
         return IdleSupervisor.create();
     }
 
index 1a8df09..a73a562 100644 (file)
@@ -25,9 +25,12 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterActivated;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialCandidateSync;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialOwnerSync;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
 import org.slf4j.Logger;
@@ -49,7 +52,8 @@ public final class OwnerSyncer extends AbstractBehavior<OwnerSupervisorCommand>
 
     private int toSync = -1;
 
-    private OwnerSyncer(final ActorContext<OwnerSupervisorCommand> context) {
+    private OwnerSyncer(final ActorContext<OwnerSupervisorCommand> context,
+                        @Nullable final ActorRef<OwnerSupervisorReply> notifyDatacenterStarted) {
         super(context);
         LOG.debug("Starting candidate and owner sync");
 
@@ -61,10 +65,15 @@ public final class OwnerSyncer extends AbstractBehavior<OwnerSupervisorCommand>
             Duration.ofSeconds(5)).askGet(
                 askReplyTo -> new Replicator.Get<>(CandidateRegistry.KEY, Replicator.readLocal(), askReplyTo),
                 InitialCandidateSync::new);
+
+        if (notifyDatacenterStarted != null) {
+            notifyDatacenterStarted.tell(DataCenterActivated.INSTANCE);
+        }
     }
 
-    public static Behavior<OwnerSupervisorCommand> create() {
-        return Behaviors.setup(OwnerSyncer::new);
+    public static Behavior<OwnerSupervisorCommand> create(
+            final ActorRef<OwnerSupervisorReply> notifyDatacenterStarted) {
+        return Behaviors.setup(ctx -> new OwnerSyncer(ctx, notifyDatacenterStarted));
     }
 
     @Override
index 96388d3..29e8502 100644 (file)
@@ -7,14 +7,20 @@
  */
 package org.opendaylight.controller.eos.akka.owner.supervisor.command;
 
+import akka.actor.typed.ActorRef;
 import java.io.Serializable;
+import org.eclipse.jdt.annotation.Nullable;
 
 public final class ActivateDataCenter extends OwnerSupervisorCommand implements Serializable {
-    public static final ActivateDataCenter INSTANCE = new ActivateDataCenter();
-
     private static final long serialVersionUID = 1L;
 
-    private ActivateDataCenter() {
-        // NOOP
+    private final ActorRef<OwnerSupervisorReply> replyTo;
+
+    public ActivateDataCenter(final @Nullable ActorRef<OwnerSupervisorReply> replyTo) {
+        this.replyTo = replyTo;
+    }
+
+    public ActorRef<OwnerSupervisorReply> getReplyTo() {
+        return replyTo;
     }
 }
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/DataCenterActivated.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/DataCenterActivated.java
new file mode 100644 (file)
index 0000000..a82bf22
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import java.io.Serializable;
+
+public final class DataCenterActivated extends OwnerSupervisorReply implements Serializable {
+    private static final long serialVersionUID = 1L;
+    public static final DataCenterActivated INSTANCE = new DataCenterActivated();
+
+    private DataCenterActivated() {
+        // NOOP
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/DataCenterDeactivated.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/DataCenterDeactivated.java
new file mode 100644 (file)
index 0000000..4879fc7
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import java.io.Serializable;
+
+public final class DataCenterDeactivated extends OwnerSupervisorReply implements Serializable {
+    private static final long serialVersionUID = 1L;
+    public static final DataCenterDeactivated INSTANCE = new DataCenterDeactivated();
+
+    private DataCenterDeactivated() {
+        // NOOP
+    }
+}
index eb728eb..039bfc5 100644 (file)
@@ -7,13 +7,20 @@
  */
 package org.opendaylight.controller.eos.akka.owner.supervisor.command;
 
+import akka.actor.typed.ActorRef;
 import java.io.Serializable;
+import org.eclipse.jdt.annotation.Nullable;
 
 public final class DeactivateDataCenter extends OwnerSupervisorCommand implements Serializable {
-    public static final DeactivateDataCenter INSTANCE = new DeactivateDataCenter();
     private static final long serialVersionUID = 1L;
 
-    private DeactivateDataCenter() {
-        // NOOP
+    private final ActorRef<OwnerSupervisorReply> replyTo;
+
+    public DeactivateDataCenter(final @Nullable ActorRef<OwnerSupervisorReply> replyTo) {
+        this.replyTo = replyTo;
+    }
+
+    public ActorRef<OwnerSupervisorReply> getReplyTo() {
+        return replyTo;
     }
 }
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/OwnerSupervisorReply.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/OwnerSupervisorReply.java
new file mode 100644 (file)
index 0000000..c34d441
--- /dev/null
@@ -0,0 +1,14 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+public abstract class OwnerSupervisorReply {
+    OwnerSupervisorReply() {
+        // Hidden on purpose
+    }
+}
index 201d6c6..ee1782a 100644 (file)
@@ -29,6 +29,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -42,6 +43,7 @@ import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateD
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
@@ -234,12 +236,22 @@ public abstract class AbstractNativeEosTest {
         });
     }
 
-    protected static void activateDatacenter(final ClusterNode clusterNode) {
-        clusterNode.getOwnerSupervisor().tell(ActivateDataCenter.INSTANCE);
+    protected static CompletableFuture<OwnerSupervisorReply> activateDatacenter(final ClusterNode clusterNode) {
+        final CompletionStage<OwnerSupervisorReply> ask =
+                AskPattern.ask(clusterNode.getOwnerSupervisor(),
+                        ActivateDataCenter::new,
+                        Duration.ofSeconds(20),
+                        clusterNode.actorSystem.scheduler());
+        return ask.toCompletableFuture();
     }
 
-    protected static void deactivateDatacenter(final ClusterNode clusterNode) {
-        clusterNode.getOwnerSupervisor().tell(DeactivateDataCenter.INSTANCE);
+    protected static CompletableFuture<OwnerSupervisorReply> deactivateDatacenter(final ClusterNode clusterNode) {
+        final CompletionStage<OwnerSupervisorReply> ask =
+                AskPattern.ask(clusterNode.getOwnerSupervisor(),
+                        DeactivateDataCenter::new,
+                        Duration.ofSeconds(20),
+                        clusterNode.actorSystem.scheduler());
+        return ask.toCompletableFuture();
     }
 
     protected static void verifyListenerState(final MockEntityOwnershipListener listener, final DOMEntity entity,
index 440229e..28566c7 100644 (file)
@@ -62,7 +62,7 @@ public class DataCentersTest extends AbstractNativeEosTest {
         registerCandidates(node3, ENTITY_1, "member-3");
         registerCandidates(node4, ENTITY_1, "member-4");
 
-        activateDatacenter(node1);
+        activateDatacenter(node1).get();
 
         waitUntillOwnerPresent(node1, ENTITY_1);
         final MockEntityOwnershipListener listener1 = registerListener(node1, ENTITY_1);
@@ -76,8 +76,8 @@ public class DataCentersTest extends AbstractNativeEosTest {
         verifyListenerState(listener1, ENTITY_1, false, false, true);
         verifyListenerState(listener2, ENTITY_1, false, false, false);
 
-        deactivateDatacenter(node1);
-        activateDatacenter(node4);
+        deactivateDatacenter(node1).get();
+        activateDatacenter(node4).get();
 
         verifyListenerState(listener1, ENTITY_1, true, false, false);
         verifyListenerState(listener2, ENTITY_1, true, true, false);
@@ -88,8 +88,8 @@ public class DataCentersTest extends AbstractNativeEosTest {
         verifyListenerState(listener1, ENTITY_1, true, false, false);
         verifyListenerState(listener2, ENTITY_1, true, false, true);
 
-        deactivateDatacenter(node3);
-        activateDatacenter(node2);
+        deactivateDatacenter(node3).get();
+        activateDatacenter(node2).get();
 
         // no candidate in dc-primary so no owners after datacenter activation
         verifyListenerState(listener1, ENTITY_1, false, false, false);
@@ -97,12 +97,12 @@ public class DataCentersTest extends AbstractNativeEosTest {
     }
 
     @Test
-    public void testDataCenterShutdown() {
+    public void testDataCenterShutdown() throws Exception {
         registerCandidates(node1, ENTITY_1, "member-1");
         registerCandidates(node3, ENTITY_1, "member-3");
         registerCandidates(node4, ENTITY_1, "member-4");
 
-        activateDatacenter(node1);
+        activateDatacenter(node1).get();
 
         waitUntillOwnerPresent(node1, ENTITY_1);
         final MockEntityOwnershipListener listener1 = registerListener(node1, ENTITY_1);
@@ -119,7 +119,7 @@ public class DataCentersTest extends AbstractNativeEosTest {
         ActorTestKit.shutdown(node1.getActorSystem(), Duration.ofSeconds(20));
         ActorTestKit.shutdown(node2.getActorSystem(), Duration.ofSeconds(20));
 
-        activateDatacenter(node3);
+        activateDatacenter(node3).get();
         verifyListenerState(listener2, ENTITY_1, true, true, false);
 
         unregisterCandidates(node3, ENTITY_1, "member-3");
index 607bff6..25c8847 100644 (file)
@@ -261,4 +261,18 @@ module cluster-admin {
             }
         }
     }
+
+    rpc activate-eos-datacenter {
+        description "Activates the datacenter that the node this rpc is called on belongs to. The caller must maintain
+                     only a single active datacenter at a time as the singleton components will interfere with each
+                     other otherwise. This only needs to be used if configuring multiple datacenters or if not using
+                     default datacenter.";
+    }
+
+    rpc deactivate-eos-datacenter {
+        description "Deactivates the datacenter that the node this rpc is called on belongs to. The caller must maintain
+                     only a single active datacenter at a time as the singleton components will interfere with each
+                     other otherwise. This only needs to be used if configuring multiple datacenters or if not using
+                     default datacenter.";
+    }
 }
index b9c8bf8..52a5ac7 100644 (file)
     </dependency>
 
     <!-- OpenDaylight -->
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>eos-dom-akka</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.mdsal</groupId>
       <artifactId>mdsal-binding-api</artifactId>
index cc1152b..f11c60f 100644 (file)
@@ -54,7 +54,10 @@ import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshotList;
 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.eos.akka.NativeEosService;
 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ActivateEosDatacenterInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ActivateEosDatacenterOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutputBuilder;
@@ -72,6 +75,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DeactivateEosDatacenterInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DeactivateEosDatacenterOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutputBuilder;
@@ -130,10 +135,12 @@ public class ClusterAdminRpcService implements ClusterAdminService {
     private final DistributedDataStoreInterface operDataStore;
     private final BindingNormalizedNodeSerializer serializer;
     private final Timeout makeLeaderLocalTimeout;
+    private final NativeEosService nativeEosService;
 
     public ClusterAdminRpcService(final DistributedDataStoreInterface configDataStore,
-            final DistributedDataStoreInterface operDataStore,
-            final BindingNormalizedNodeSerializer serializer) {
+                                  final DistributedDataStoreInterface operDataStore,
+                                  final BindingNormalizedNodeSerializer serializer,
+                                  final NativeEosService nativeEosService) {
         this.configDataStore = configDataStore;
         this.operDataStore = operDataStore;
         this.serializer = serializer;
@@ -141,6 +148,8 @@ public class ClusterAdminRpcService implements ClusterAdminService {
         this.makeLeaderLocalTimeout =
                 new Timeout(configDataStore.getActorUtils().getDatastoreContext()
                         .getShardLeaderElectionTimeout().duration().$times(2));
+
+        this.nativeEosService = nativeEosService;
     }
 
     @Override
@@ -514,6 +523,50 @@ public class ClusterAdminRpcService implements ClusterAdminService {
             MoreExecutors.directExecutor());
     }
 
+    @Override
+    public ListenableFuture<RpcResult<ActivateEosDatacenterOutput>> activateEosDatacenter(
+            final ActivateEosDatacenterInput input) {
+        LOG.debug("Activating EOS Datacenter");
+        final SettableFuture<RpcResult<ActivateEosDatacenterOutput>> future = SettableFuture.create();
+        Futures.addCallback(nativeEosService.activateDataCenter(), new FutureCallback<>() {
+            @Override
+            public void onSuccess(final Void result) {
+                LOG.debug("Successfully activated datacenter.");
+                future.set(RpcResultBuilder.<ActivateEosDatacenterOutput>success().build());
+            }
+
+            @Override
+            public void onFailure(final Throwable failure) {
+                future.set(ClusterAdminRpcService.<ActivateEosDatacenterOutput>newFailedRpcResultBuilder(
+                        "Failed to activate datacenter.", failure).build());
+            }
+        }, MoreExecutors.directExecutor());
+
+        return future;
+    }
+
+    @Override
+    public ListenableFuture<RpcResult<DeactivateEosDatacenterOutput>> deactivateEosDatacenter(
+            final DeactivateEosDatacenterInput input) {
+        LOG.debug("Deactivating EOS Datacenter");
+        final SettableFuture<RpcResult<DeactivateEosDatacenterOutput>> future = SettableFuture.create();
+        Futures.addCallback(nativeEosService.deactivateDataCenter(), new FutureCallback<>() {
+            @Override
+            public void onSuccess(final Void result) {
+                LOG.debug("Successfully deactivated datacenter.");
+                future.set(RpcResultBuilder.<DeactivateEosDatacenterOutput>success().build());
+            }
+
+            @Override
+            public void onFailure(final Throwable failure) {
+                future.set(ClusterAdminRpcService.<DeactivateEosDatacenterOutput>newFailedRpcResultBuilder(
+                        "Failed to deactivate datacenter.", failure).build());
+            }
+        }, MoreExecutors.directExecutor());
+
+        return future;
+    }
+
     private static RpcResult<GetKnownClientsForAllShardsOutput> processReplies(
             final ImmutableMap<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> allShardReplies) {
         final Map<ShardResultKey, ShardResult> result = Maps.newHashMapWithExpectedSize(allShardReplies.size());
index 5280ad5..c0e9448 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore.admin;
 
 import com.google.common.annotations.Beta;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
+import org.opendaylight.controller.eos.akka.NativeEosService;
 import org.opendaylight.mdsal.binding.api.RpcProviderService;
 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService;
@@ -33,13 +34,15 @@ public final class OSGiClusterAdmin {
     BindingNormalizedNodeSerializer serializer = null;
     @Reference
     RpcProviderService rpcProviderService = null;
+    @Reference
+    NativeEosService nativeEosService = null;
 
     private ObjectRegistration<?> reg;
 
     @Activate
     void activate() {
         reg = rpcProviderService.registerRpcImplementation(ClusterAdminService.class,
-            new ClusterAdminRpcService(configDatastore, operDatastore, serializer));
+            new ClusterAdminRpcService(configDatastore, operDatastore, serializer, nativeEosService));
         LOG.info("Cluster Admin services started");
     }
 
index f90a428..ba00174 100644 (file)
@@ -127,7 +127,8 @@ public class ClusterAdminRpcServiceTest {
         String fileName = "target/testBackupDatastore";
         new File(fileName).delete();
 
-        ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore(), null);
+        final ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore(),
+                null, null);
 
         RpcResult<BackupDatastoreOutput> rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder()
                 .setFilePath(fileName).build()).get(5, TimeUnit.SECONDS);
@@ -291,8 +292,8 @@ public class ClusterAdminRpcServiceTest {
         MemberNode memberNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
                 .moduleShardsConfig("module-shards-cars-member-1.conf").build();
 
-        ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
-                memberNode.operDataStore(), null);
+        final ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
+                memberNode.operDataStore(), null, null);
 
         RpcResult<AddShardReplicaOutput> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
                 .setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
@@ -335,8 +336,8 @@ public class ClusterAdminRpcServiceTest {
             final String... peerMemberNames) throws Exception {
         memberNode.waitForMembersUp(peerMemberNames);
 
-        ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
-                memberNode.operDataStore(), null);
+        final ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
+                memberNode.operDataStore(), null, null);
 
         RpcResult<AddShardReplicaOutput> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
             .setShardName(shardName).setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
@@ -356,8 +357,8 @@ public class ClusterAdminRpcServiceTest {
 
     private static void doMakeShardLeaderLocal(final MemberNode memberNode, final String shardName,
             final String newLeader) throws Exception {
-        ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
-                memberNode.operDataStore(), null);
+        final ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
+                memberNode.operDataStore(), null, null);
 
         final RpcResult<MakeLeaderLocalOutput> rpcResult = service.makeLeaderLocal(new MakeLeaderLocalInputBuilder()
                 .setDataStoreType(DataStoreType.Config).setShardName(shardName).build())
@@ -412,8 +413,8 @@ public class ClusterAdminRpcServiceTest {
 
         // Invoke RPC service on member-3 to remove it's local shard
 
-        ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
-                replicaNode3.operDataStore(), null);
+        final ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+                replicaNode3.operDataStore(), null, null);
 
         RpcResult<RemoveShardReplicaOutput> rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder()
                 .setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build())
@@ -437,8 +438,8 @@ public class ClusterAdminRpcServiceTest {
 
         // Invoke RPC service on member-1 to remove member-2
 
-        ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
-                leaderNode1.operDataStore(), null);
+        final ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
+                leaderNode1.operDataStore(), null, null);
 
         rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().setShardName("cars")
                 .setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
@@ -473,8 +474,8 @@ public class ClusterAdminRpcServiceTest {
 
         // Invoke RPC service on leader member-1 to remove it's local shard
 
-        ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
-                leaderNode1.operDataStore(), null);
+        final ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
+                leaderNode1.operDataStore(), null, null);
 
         RpcResult<RemoveShardReplicaOutput> rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder()
                 .setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build())
@@ -521,8 +522,8 @@ public class ClusterAdminRpcServiceTest {
                                 newReplicaNode2.kit().getRef());
         newReplicaNode2.kit().expectMsgClass(Success.class);
 
-        ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(),
-                newReplicaNode2.operDataStore(), null);
+        final ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(),
+                newReplicaNode2.operDataStore(), null, null);
 
         RpcResult<AddReplicasForAllShardsOutput> rpcResult = service.addReplicasForAllShards(
             new AddReplicasForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
@@ -579,8 +580,8 @@ public class ClusterAdminRpcServiceTest {
         verifyRaftPeersPresent(replicaNode2.configDataStore(), "pets", "member-1", "member-3");
         verifyRaftPeersPresent(replicaNode3.configDataStore(), "pets", "member-1", "member-2");
 
-        ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
-                replicaNode3.operDataStore(), null);
+        final ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+                replicaNode3.operDataStore(), null, null);
 
         RpcResult<RemoveAllShardReplicasOutput> rpcResult = service3.removeAllShardReplicas(
                 new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build()).get(10, TimeUnit.SECONDS);
@@ -625,8 +626,8 @@ public class ClusterAdminRpcServiceTest {
 
         // Invoke RPC service on member-3 to change voting status
 
-        ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
-                replicaNode3.operDataStore(), null);
+        final ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+                replicaNode3.operDataStore(), null, null);
 
         RpcResult<ChangeMemberVotingStatesForShardOutput> rpcResult = service3
                 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
@@ -659,8 +660,8 @@ public class ClusterAdminRpcServiceTest {
 
         // Invoke RPC service on member-3 to change voting status
 
-        ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(),
-                leaderNode.operDataStore(), null);
+        final ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(),
+                leaderNode.operDataStore(), null, null);
 
         RpcResult<ChangeMemberVotingStatesForShardOutput> rpcResult = service
                 .changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
@@ -701,8 +702,8 @@ public class ClusterAdminRpcServiceTest {
 
         // Invoke RPC service on member-3 to change voting status
 
-        ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
-                replicaNode3.operDataStore(), null);
+        final ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+                replicaNode3.operDataStore(), null, null);
 
         RpcResult<ChangeMemberVotingStatesForAllShardsOutput> rpcResult = service3.changeMemberVotingStatesForAllShards(
                 new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(List.of(
@@ -753,8 +754,8 @@ public class ClusterAdminRpcServiceTest {
         verifyVotingStates(leaderNode1.configDataStore(), "cars", new SimpleEntry<>("member-1", TRUE),
                 new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", FALSE));
 
-        ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
-                replicaNode3.operDataStore(), null);
+        final ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+                replicaNode3.operDataStore(), null, null);
 
         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service3.flipMemberVotingStatesForAllShards(
             new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
@@ -848,8 +849,8 @@ public class ClusterAdminRpcServiceTest {
         verifyRaftState(replicaNode1.configDataStore(), "cars", raftState ->
             assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState()));
 
-        ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(),
-                replicaNode1.operDataStore(), null);
+        final ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(),
+                replicaNode1.operDataStore(), null, null);
 
         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards(
             new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
@@ -915,8 +916,8 @@ public class ClusterAdminRpcServiceTest {
                 new SimpleEntry<>("member-4", FALSE), new SimpleEntry<>("member-5", FALSE),
                 new SimpleEntry<>("member-6", FALSE));
 
-        ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
-                leaderNode1.operDataStore(), null);
+        final ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
+                leaderNode1.operDataStore(), null, null);
 
         RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards(
             new FlipMemberVotingStatesForAllShardsInputBuilder().build()).get(10, TimeUnit.SECONDS);
@@ -954,7 +955,7 @@ public class ClusterAdminRpcServiceTest {
                             type + datastoreTypeSuffix).toString(), info.isVoting()));
                 }
 
-                String shardID = ShardIdentifier.create(shard, MemberName.forName(member),
+                final String shardID = ShardIdentifier.create(shard, MemberName.forName(member),
                         type + datastoreTypeSuffix).toString();
                 InMemoryJournal.addEntry(shardID, 1, new UpdateElectionTerm(1, null));
                 InMemoryJournal.addEntry(shardID, 2, new SimpleReplicatedLogEntry(0, 1,