Use akka-multi-dc in eos native 80/95980/18
authorTomas Cere <tomas.cere@pantheon.tech>
Fri, 30 Apr 2021 10:05:55 +0000 (12:05 +0200)
committerRobert Varga <nite@hq.sk>
Tue, 29 Jun 2021 09:31:18 +0000 (09:31 +0000)
Add the ability to activate/deactivate singleton supervisor.
Make the supervisor pick owners only from the active datacenter.
All in all this should give us the building blocks needed for
the active/backup cluster use-case.

JIRA: CONTROLLER-1982
Change-Id: Iae0a4a59cc461c8187c59bb8a5a66c73dee5be4c
Signed-off-by: Tomas Cere <tomas.cere@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/bootstrap/EOSMain.java
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/IdleSupervisor.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisor.java
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/ActivateDataCenter.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/DeactivateDataCenter.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java
opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipServiceTest.java
opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/DataCentersTest.java [new file with mode: 0644]
opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/SingleNodeTest.java
opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/ThreeNodeReachabilityTest.java
opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSupervisorTest.java

index 54f9a6b1487bf9657a049d53910f2b6c9bca11d9..5199d22ed589174d8c37dd31640e745cfbeacd1a 100644 (file)
@@ -22,7 +22,7 @@ import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
 import org.opendaylight.controller.eos.akka.bootstrap.command.Terminate;
 import org.opendaylight.controller.eos.akka.owner.checker.OwnerStateChecker;
 import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
 import org.opendaylight.controller.eos.akka.bootstrap.command.Terminate;
 import org.opendaylight.controller.eos.akka.owner.checker.OwnerStateChecker;
 import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
-import org.opendaylight.controller.eos.akka.owner.supervisor.OwnerSyncer;
+import org.opendaylight.controller.eos.akka.owner.supervisor.IdleSupervisor;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
@@ -47,7 +47,7 @@ public final class EOSMain extends AbstractBehavior<BootstrapCommand> {
 
         final ClusterSingleton clusterSingleton = ClusterSingleton.get(context.getSystem());
         // start the initial sync behavior that switches to the regular one after syncing
 
         final ClusterSingleton clusterSingleton = ClusterSingleton.get(context.getSystem());
         // start the initial sync behavior that switches to the regular one after syncing
-        ownerSupervisor = clusterSingleton.init(SingletonActor.of(OwnerSyncer.create(), "OwnerSupervisor"));
+        ownerSupervisor = clusterSingleton.init(SingletonActor.of(IdleSupervisor.create(), "OwnerSupervisor"));
     }
 
     public static Behavior<BootstrapCommand> create() {
     }
 
     public static Behavior<BootstrapCommand> create() {
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/IdleSupervisor.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/IdleSupervisor.java
new file mode 100644 (file)
index 0000000..9cbea5c
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor;
+
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.Member;
+import akka.cluster.typed.Cluster;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Initial Supervisor behavior that stays idle and only switches itself to the active behavior when its running
+ * in the primary datacenter, or is activated on demand. Once the supervisor instance is no longer needed in the
+ * secondary datacenter it needs to be deactivated manually.
+ */
+public final class IdleSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
+    private static final Logger LOG = LoggerFactory.getLogger(IdleSupervisor.class);
+
+    private static final String DATACENTER_PREFIX = "dc-";
+    private static final String DEFAULT_DATACENTER = "dc-default";
+
+    private IdleSupervisor(final ActorContext<OwnerSupervisorCommand> context) {
+        super(context);
+        final Cluster cluster = Cluster.get(context.getSystem());
+
+        final String datacenterRole = extractDatacenterRole(cluster.selfMember());
+        if (datacenterRole.equals(DEFAULT_DATACENTER)) {
+            LOG.debug("No datacenter configured, activating default data center");
+            context.getSelf().tell(ActivateDataCenter.INSTANCE);
+        }
+
+        LOG.debug("Idle supervisor started on {}.", cluster.selfMember());
+    }
+
+    public static Behavior<OwnerSupervisorCommand> create() {
+
+        return Behaviors.setup(IdleSupervisor::new);
+    }
+
+    @Override
+    public Receive<OwnerSupervisorCommand> createReceive() {
+        return newReceiveBuilder()
+                .onMessage(ActivateDataCenter.class, this::onActivateDataCenter)
+                .build();
+    }
+
+    private Behavior<OwnerSupervisorCommand> onActivateDataCenter(final ActivateDataCenter message) {
+        LOG.debug("Received ActivateDataCenter command switching to syncer behavior,");
+        return OwnerSyncer.create();
+    }
+
+    private String extractDatacenterRole(final Member selfMember) {
+        return selfMember.getRoles().stream()
+                .filter(role -> role.startsWith(DATACENTER_PREFIX))
+                .findFirst()
+                .orElseThrow(() -> new IllegalArgumentException(selfMember + " does not have a valid role"));
+    }
+}
index 41c83d4723d5425cca424871f54d3a95e4d83880..c012afe6a1dc90f8860fca275d0f558f78c347fc 100644 (file)
@@ -40,6 +40,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.CandidatesChanged;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberDownEvent;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
@@ -60,7 +61,7 @@ import scala.collection.JavaConverters;
 public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
 
     private static final Logger LOG = LoggerFactory.getLogger(OwnerSupervisor.class);
 public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorCommand> {
 
     private static final Logger LOG = LoggerFactory.getLogger(OwnerSupervisor.class);
-    private static final String DATACENTER_PREFIX = "dc";
+    private static final String DATACENTER_PREFIX = "dc-";
 
     private final ReplicatorMessageAdapter<OwnerSupervisorCommand, LWWRegister<String>> ownerReplicator;
 
 
     private final ReplicatorMessageAdapter<OwnerSupervisorCommand, LWWRegister<String>> ownerReplicator;
 
@@ -71,6 +72,7 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
 
     private final Cluster cluster;
     private final SelfUniqueAddress node;
 
     private final Cluster cluster;
     private final SelfUniqueAddress node;
+    private final String dataCenter;
 
     private final Set<String> activeMembers;
 
 
     private final Set<String> activeMembers;
 
@@ -89,11 +91,12 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
         final DistributedData distributedData = DistributedData.get(context.getSystem());
         final ActorRef<Replicator.Command> replicator = distributedData.replicator();
 
         final DistributedData distributedData = DistributedData.get(context.getSystem());
         final ActorRef<Replicator.Command> replicator = distributedData.replicator();
 
-        this.cluster = Cluster.get(context.getSystem());
-        this.ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
+        cluster = Cluster.get(context.getSystem());
+        ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
+        dataCenter = extractDatacenterRole(cluster.selfMember());
 
 
-        this.node = distributedData.selfUniqueAddress();
-        this.activeMembers = getActiveMembers(cluster);
+        node = distributedData.selfUniqueAddress();
+        activeMembers = getActiveMembers();
 
         this.currentCandidates = currentCandidates;
         this.currentOwners = currentOwners;
 
         this.currentCandidates = currentCandidates;
         this.currentOwners = currentOwners;
@@ -141,6 +144,7 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
     public Receive<OwnerSupervisorCommand> createReceive() {
         return newReceiveBuilder()
                 .onMessage(CandidatesChanged.class, this::onCandidatesChanged)
     public Receive<OwnerSupervisorCommand> createReceive() {
         return newReceiveBuilder()
                 .onMessage(CandidatesChanged.class, this::onCandidatesChanged)
+                .onMessage(DeactivateDataCenter.class, this::onDeactivateDatacenter)
                 .onMessage(MemberUpEvent.class, this::onPeerUp)
                 .onMessage(MemberDownEvent.class, this::onPeerDown)
                 .onMessage(MemberReachableEvent.class, this::onPeerReachable)
                 .onMessage(MemberUpEvent.class, this::onPeerUp)
                 .onMessage(MemberDownEvent.class, this::onPeerDown)
                 .onMessage(MemberReachableEvent.class, this::onPeerReachable)
@@ -148,6 +152,11 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
                 .build();
     }
 
                 .build();
     }
 
+    private Behavior<OwnerSupervisorCommand> onDeactivateDatacenter(final DeactivateDataCenter command) {
+        LOG.debug("Deactivating Owner Supervisor on {}", cluster.selfMember());
+        return IdleSupervisor.create();
+    }
+
     private void reassignUnreachableOwners() {
         final Set<String> ownersToReassign = new HashSet<>();
         for (final String owner : ownerToEntity.keys()) {
     private void reassignUnreachableOwners() {
         final Set<String> ownersToReassign = new HashSet<>();
         for (final String owner : ownerToEntity.keys()) {
@@ -326,8 +335,12 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
     }
 
     private void handleReachableEvent(final Set<String> roles) {
     }
 
     private void handleReachableEvent(final Set<String> roles) {
-        activeMembers.add(extractRole(roles));
-        assignMissingOwners();
+        if (roles.contains(dataCenter)) {
+            activeMembers.add(extractRole(roles));
+            assignMissingOwners();
+        } else {
+            LOG.debug("Received reachable event from a foreign datacenter, Ignoring... Roles: {}", roles);
+        }
     }
 
     private Behavior<OwnerSupervisorCommand> onPeerDown(final MemberDownEvent event) {
     }
 
     private Behavior<OwnerSupervisorCommand> onPeerDown(final MemberDownEvent event) {
@@ -345,17 +358,30 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
     }
 
     private void handleUnreachableEvent(final Set<String> roles) {
     }
 
     private void handleUnreachableEvent(final Set<String> roles) {
-        activeMembers.remove(extractRole(roles));
-        reassignUnreachableOwners();
+        if (roles.contains(dataCenter)) {
+            activeMembers.remove(extractRole(roles));
+            reassignUnreachableOwners();
+        } else {
+            LOG.debug("Received unreachable event from a foreign datacenter, Ignoring... Roles: {}", roles);
+        }
     }
 
     }
 
-    private static Set<String> getActiveMembers(final Cluster cluster) {
-        final Set<String> activeMembers = new HashSet<>();
-        cluster.state().getMembers().forEach(member -> activeMembers.add(extractRole(member)));
-        activeMembers.removeAll(cluster.state().getUnreachable().stream()
-                .map(OwnerSupervisor::extractRole).collect(Collectors.toSet()));
+    private Set<String> getActiveMembers() {
+        final Set<String> members = new HashSet<>();
+        cluster.state().getMembers().forEach(member -> members.add(extractRole(member)));
+        // filter out unreachable
+        members.removeAll(cluster.state().getUnreachable().stream()
+                .map(OwnerSupervisor::extractRole)
+                .collect(Collectors.toSet()));
+
+        // filter out members not from our datacenter
+        cluster.state().getMembers().forEach(member -> {
+            if (!member.roles().contains(dataCenter)) {
+                members.remove(extractRole(member));
+            }
+        });
 
 
-        return activeMembers;
+        return members;
     }
 
     private static String extractRole(final Member member) {
     }
 
     private static String extractRole(final Member member) {
@@ -363,7 +389,12 @@ public final class OwnerSupervisor extends AbstractBehavior<OwnerSupervisorComma
     }
 
     private static String extractRole(final Set<String> roles) {
     }
 
     private static String extractRole(final Set<String> roles) {
-        return roles.stream().filter(role -> !role.contains(DATACENTER_PREFIX))
+        return roles.stream().filter(role -> !role.startsWith(DATACENTER_PREFIX))
+                .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
+    }
+
+    private String extractDatacenterRole(final Member member) {
+        return member.getRoles().stream().filter(role -> role.startsWith(DATACENTER_PREFIX))
                 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
     }
 }
                 .findFirst().orElseThrow(() -> new IllegalArgumentException("No valid role found."));
     }
 }
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/ActivateDataCenter.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/ActivateDataCenter.java
new file mode 100644 (file)
index 0000000..96388d3
--- /dev/null
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import java.io.Serializable;
+
+public final class ActivateDataCenter extends OwnerSupervisorCommand implements Serializable {
+    public static final ActivateDataCenter INSTANCE = new ActivateDataCenter();
+
+    private static final long serialVersionUID = 1L;
+
+    private ActivateDataCenter() {
+        // NOOP
+    }
+}
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/DeactivateDataCenter.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/command/DeactivateDataCenter.java
new file mode 100644 (file)
index 0000000..eb728eb
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.owner.supervisor.command;
+
+import java.io.Serializable;
+
+public final class DeactivateDataCenter extends OwnerSupervisorCommand implements Serializable {
+    public static final DeactivateDataCenter INSTANCE = new DeactivateDataCenter();
+    private static final long serialVersionUID = 1L;
+
+    private DeactivateDataCenter() {
+        // NOOP
+    }
+}
index 8d10119c796acaa9db370af38f951e116c3f681b..201d6c659be329a1fa73e135cd1cce01327ffc73 100644 (file)
@@ -37,6 +37,8 @@ import org.opendaylight.controller.eos.akka.bootstrap.EOSMain;
 import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
 import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
 import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
 import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
 import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
 import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
@@ -56,6 +58,8 @@ public abstract class AbstractNativeEosTest {
     public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1");
     public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2");
 
     public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1");
     public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2");
 
+    protected static final String DEFAULT_DATACENTER = "dc-default";
+
     protected static final List<String> TWO_NODE_SEED_NODES =
             List.of("akka://ClusterSystem@127.0.0.1:2550",
                     "akka://ClusterSystem@127.0.0.1:2551");
     protected static final List<String> TWO_NODE_SEED_NODES =
             List.of("akka://ClusterSystem@127.0.0.1:2550",
                     "akka://ClusterSystem@127.0.0.1:2551");
@@ -65,10 +69,17 @@ public abstract class AbstractNativeEosTest {
                     "akka://ClusterSystem@127.0.0.1:2551",
                     "akka://ClusterSystem@127.0.0.1:2552");
 
                     "akka://ClusterSystem@127.0.0.1:2551",
                     "akka://ClusterSystem@127.0.0.1:2552");
 
+    protected static final List<String> DATACENTER_SEED_NODES =
+            List.of("akka://ClusterSystem@127.0.0.1:2550",
+                    "akka://ClusterSystem@127.0.0.1:2551",
+                    "akka://ClusterSystem@127.0.0.1:2552",
+                    "akka://ClusterSystem@127.0.0.1:2553");
+
     private static final String REMOTE_PROTOCOL = "akka";
     private static final String PORT_PARAM = "akka.remote.artery.canonical.port";
     private static final String ROLE_PARAM = "akka.cluster.roles";
     private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes";
     private static final String REMOTE_PROTOCOL = "akka";
     private static final String PORT_PARAM = "akka.remote.artery.canonical.port";
     private static final String ROLE_PARAM = "akka.cluster.roles";
     private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes";
+    private static final String DATA_CENTER_PARAM = "akka.cluster.multi-data-center.self-data-center";
 
 
     protected static ClusterNode startupRemote(final int port, final List<String> roles)
 
 
     protected static ClusterNode startupRemote(final int port, final List<String> roles)
@@ -120,6 +131,35 @@ public abstract class AbstractNativeEosTest {
                 runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
     }
 
                 runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
     }
 
+    protected static ClusterNode startupWithDatacenter(final int port, final List<String> roles,
+                                                       final List<String> seedNodes, final String dataCenter)
+            throws ExecutionException, InterruptedException {
+        final Map<String, Object> overrides = new HashMap<>();
+        overrides.put(PORT_PARAM, port);
+        overrides.put(ROLE_PARAM, roles);
+        if (!seedNodes.isEmpty()) {
+            overrides.put(SEED_NODES_PARAM, seedNodes);
+        }
+        overrides.put(DATA_CENTER_PARAM, dataCenter);
+
+        final Config config = ConfigFactory.parseMap(overrides)
+                .withFallback(ConfigFactory.load());
+
+        // Create a classic Akka system since thats what we will have in osgi
+        final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
+        final ActorRef<BootstrapCommand> eosBootstrap =
+                Adapter.spawn(system, EOSMain.create(), "EOSBootstrap");
+
+        final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
+                GetRunningContext::new,
+                Duration.ofSeconds(5),
+                Adapter.toTyped(system.scheduler()));
+        final RunningContext runningContext = ask.toCompletableFuture().get();
+
+        return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
+                runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
+    }
+
     private static Behavior<BootstrapCommand> rootBehavior() {
         return Behaviors.setup(context -> EOSMain.create());
     }
     private static Behavior<BootstrapCommand> rootBehavior() {
         return Behaviors.setup(context -> EOSMain.create());
     }
@@ -152,20 +192,22 @@ public abstract class AbstractNativeEosTest {
         return listener;
     }
 
         return listener;
     }
 
-    protected static void reachableMember(final ClusterNode node, final String role) {
+    protected static void reachableMember(final ClusterNode node, final String... role) {
         reachableMember(node.getOwnerSupervisor(), role);
     }
 
         reachableMember(node.getOwnerSupervisor(), role);
     }
 
-    protected static void reachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor, final String role) {
+    protected static void reachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
+                                          final String... role) {
         ownerSupervisor.tell(new MemberReachableEvent(
                 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
     }
 
         ownerSupervisor.tell(new MemberReachableEvent(
                 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
     }
 
-    protected static void unreachableMember(final ClusterNode node, final String role) {
+    protected static void unreachableMember(final ClusterNode node, final String... role) {
         unreachableMember(node.getOwnerSupervisor(), role);
     }
 
         unreachableMember(node.getOwnerSupervisor(), role);
     }
 
-    protected static void unreachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor, final String role) {
+    protected static void unreachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
+                                            final String... role) {
         ownerSupervisor.tell(new MemberUnreachableEvent(
                 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
     }
         ownerSupervisor.tell(new MemberUnreachableEvent(
                 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
     }
@@ -192,6 +234,14 @@ public abstract class AbstractNativeEosTest {
         });
     }
 
         });
     }
 
+    protected static void activateDatacenter(final ClusterNode clusterNode) {
+        clusterNode.getOwnerSupervisor().tell(ActivateDataCenter.INSTANCE);
+    }
+
+    protected static void deactivateDatacenter(final ClusterNode clusterNode) {
+        clusterNode.getOwnerSupervisor().tell(DeactivateDataCenter.INSTANCE);
+    }
+
     protected static void verifyListenerState(final MockEntityOwnershipListener listener, final DOMEntity entity,
                                               final boolean hasOwner, final boolean isOwner, final boolean wasOwner) {
         await().until(() -> !listener.getChanges().isEmpty());
     protected static void verifyListenerState(final MockEntityOwnershipListener listener, final DOMEntity entity,
                                               final boolean hasOwner, final boolean isOwner, final boolean wasOwner) {
         await().until(() -> !listener.getChanges().isEmpty());
@@ -267,13 +317,13 @@ public abstract class AbstractNativeEosTest {
 
     protected static final class MockEntityOwnershipListener implements DOMEntityOwnershipListener {
 
 
     protected static final class MockEntityOwnershipListener implements DOMEntityOwnershipListener {
 
-        private final Logger log = LoggerFactory.getLogger(MockEntityOwnershipListener.class);
+        private final Logger log;
 
         private final List<DOMEntityOwnershipChange> changes = new ArrayList<>();
         private final String member;
 
         public MockEntityOwnershipListener(final String member) {
 
         private final List<DOMEntityOwnershipChange> changes = new ArrayList<>();
         private final String member;
 
         public MockEntityOwnershipListener(final String member) {
-
+            log = LoggerFactory.getLogger("EOS-listener-" + member);
             this.member = member;
         }
 
             this.member = member;
         }
 
index 0c4b5ea9f6b618339dd2f433d11365db85a29cf0..a444aefcfb277dc1520cdefb24461c2408c3a720 100644 (file)
@@ -152,15 +152,15 @@ public class AkkaEntityOwnershipServiceTest extends AbstractNativeEosTest {
         registerCandidates(runningContext.getCandidateRegistry(), entity, "member-2");
 
         final ActorRef<OwnerSupervisorCommand> ownerSupervisor = runningContext.getOwnerSupervisor();
         registerCandidates(runningContext.getCandidateRegistry(), entity, "member-2");
 
         final ActorRef<OwnerSupervisorCommand> ownerSupervisor = runningContext.getOwnerSupervisor();
-        reachableMember(ownerSupervisor, "member-2");
-        unreachableMember(ownerSupervisor, "member-1");
+        reachableMember(ownerSupervisor, "member-2", DEFAULT_DATACENTER);
+        unreachableMember(ownerSupervisor, "member-1", DEFAULT_DATACENTER);
         verifyGetOwnershipState(service, entity, EntityOwnershipState.OWNED_BY_OTHER);
 
         final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, "two");
         final Optional<EntityOwnershipState> state = service.getOwnershipState(entity2);
         assertFalse(state.isPresent());
 
         verifyGetOwnershipState(service, entity, EntityOwnershipState.OWNED_BY_OTHER);
 
         final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, "two");
         final Optional<EntityOwnershipState> state = service.getOwnershipState(entity2);
         assertFalse(state.isPresent());
 
-        unreachableMember(ownerSupervisor, "member-2");
+        unreachableMember(ownerSupervisor, "member-2", DEFAULT_DATACENTER);
         verifyGetOwnershipState(service, entity, EntityOwnershipState.NO_OWNER);
     }
 
         verifyGetOwnershipState(service, entity, EntityOwnershipState.NO_OWNER);
     }
 
diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/DataCentersTest.java b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/DataCentersTest.java
new file mode 100644 (file)
index 0000000..440229e
--- /dev/null
@@ -0,0 +1,137 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka;
+
+import akka.actor.testkit.typed.javadsl.ActorTestKit;
+import akka.cluster.Member;
+import akka.cluster.MemberStatus;
+import akka.cluster.typed.Cluster;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+
+public class DataCentersTest extends AbstractNativeEosTest {
+
+    private ClusterNode node1 = null;
+    private ClusterNode node2 = null;
+    private ClusterNode node3 = null;
+    private ClusterNode node4 = null;
+    public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1");
+    public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2");
+
+    @Before
+    public void setUp() throws Exception {
+        node1 = startupWithDatacenter(2550, Collections.singletonList("member-1"), DATACENTER_SEED_NODES, "dc-primary");
+        node2 = startupWithDatacenter(2551, Collections.singletonList("member-2"), DATACENTER_SEED_NODES, "dc-primary");
+        node3 = startupWithDatacenter(2552, Collections.singletonList("member-3"), DATACENTER_SEED_NODES, "dc-backup");
+        node4 = startupWithDatacenter(2553, Collections.singletonList("member-4"), DATACENTER_SEED_NODES, "dc-backup");
+
+        // need to wait until all nodes are ready
+        final Cluster cluster = Cluster.get(node4.getActorSystem());
+        Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> {
+            final List<Member> members = new ArrayList<>();
+            cluster.state().getMembers().forEach(members::add);
+            if (members.size() != 4) {
+                return false;
+            }
+
+            for (final Member member : members) {
+                if (!member.status().equals(MemberStatus.up())) {
+                    return false;
+                }
+            }
+
+            return true;
+        });
+    }
+
+    @Test
+    public void testDatacenterActivation() throws Exception {
+        registerCandidates(node1, ENTITY_1, "member-1");
+        registerCandidates(node3, ENTITY_1, "member-3");
+        registerCandidates(node4, ENTITY_1, "member-4");
+
+        activateDatacenter(node1);
+
+        waitUntillOwnerPresent(node1, ENTITY_1);
+        final MockEntityOwnershipListener listener1 = registerListener(node1, ENTITY_1);
+        verifyListenerState(listener1, ENTITY_1, true, true, false);
+
+        final MockEntityOwnershipListener listener2 = registerListener(node3, ENTITY_1);
+        verifyListenerState(listener2, ENTITY_1, true, false, false);
+
+        unregisterCandidates(node1, ENTITY_1, "member-1");
+
+        verifyListenerState(listener1, ENTITY_1, false, false, true);
+        verifyListenerState(listener2, ENTITY_1, false, false, false);
+
+        deactivateDatacenter(node1);
+        activateDatacenter(node4);
+
+        verifyListenerState(listener1, ENTITY_1, true, false, false);
+        verifyListenerState(listener2, ENTITY_1, true, true, false);
+
+        unregisterCandidates(node3, ENTITY_1, "member-3");
+
+        // checking index after notif so current + 1
+        verifyListenerState(listener1, ENTITY_1, true, false, false);
+        verifyListenerState(listener2, ENTITY_1, true, false, true);
+
+        deactivateDatacenter(node3);
+        activateDatacenter(node2);
+
+        // no candidate in dc-primary so no owners after datacenter activation
+        verifyListenerState(listener1, ENTITY_1, false, false, false);
+        verifyListenerState(listener2, ENTITY_1, false, false, false);
+    }
+
+    @Test
+    public void testDataCenterShutdown() {
+        registerCandidates(node1, ENTITY_1, "member-1");
+        registerCandidates(node3, ENTITY_1, "member-3");
+        registerCandidates(node4, ENTITY_1, "member-4");
+
+        activateDatacenter(node1);
+
+        waitUntillOwnerPresent(node1, ENTITY_1);
+        final MockEntityOwnershipListener listener1 = registerListener(node1, ENTITY_1);
+        verifyListenerState(listener1, ENTITY_1, true, true, false);
+
+        final MockEntityOwnershipListener listener2 = registerListener(node3, ENTITY_1);
+        verifyListenerState(listener2, ENTITY_1, true, false, false);
+
+        unregisterCandidates(node1, ENTITY_1, "member-1");
+
+        verifyListenerState(listener1, ENTITY_1, false, false, true);
+        verifyListenerState(listener2, ENTITY_1, false, false, false);
+
+        ActorTestKit.shutdown(node1.getActorSystem(), Duration.ofSeconds(20));
+        ActorTestKit.shutdown(node2.getActorSystem(), Duration.ofSeconds(20));
+
+        activateDatacenter(node3);
+        verifyListenerState(listener2, ENTITY_1, true, true, false);
+
+        unregisterCandidates(node3, ENTITY_1, "member-3");
+        verifyListenerState(listener2, ENTITY_1, true, false, true);
+    }
+
+    @After
+    public void tearDown() {
+        ActorTestKit.shutdown(node1.getActorSystem(), Duration.ofSeconds(20));
+        ActorTestKit.shutdown(node2.getActorSystem(), Duration.ofSeconds(20));
+        ActorTestKit.shutdown(node3.getActorSystem(), Duration.ofSeconds(20));
+        ActorTestKit.shutdown(node4.getActorSystem(), Duration.ofSeconds(20));
+    }
+
+}
index 043af99aa47e5c69d6f7153cfe6521cc7ba41168..9b8862bbae2117d33fc243052e38e89edc8f4af4 100644 (file)
@@ -24,9 +24,6 @@ public class SingleNodeTest extends AbstractNativeEosTest {
     @Before
     public void setUp() throws Exception {
         clusterNode = startup(2550, List.of("member-1"));
     @Before
     public void setUp() throws Exception {
         clusterNode = startup(2550, List.of("member-1"));
-
-        reachableMember(clusterNode, "member-2");
-        reachableMember(clusterNode, "member-3");
     }
 
     @After
     }
 
     @After
@@ -47,7 +44,13 @@ public class SingleNodeTest extends AbstractNativeEosTest {
     public void testListenerPriorToAddingCandidates() {
         final MockEntityOwnershipListener listener = registerListener(clusterNode, ENTITY_1);
 
     public void testListenerPriorToAddingCandidates() {
         final MockEntityOwnershipListener listener = registerListener(clusterNode, ENTITY_1);
 
-        registerCandidates(clusterNode, ENTITY_1, "member-1", "member-2", "member-3");
+        registerCandidates(clusterNode, ENTITY_1, "member-1");
+        waitUntillOwnerPresent(clusterNode, ENTITY_1);
+
+        reachableMember(clusterNode, "member-2", DEFAULT_DATACENTER);
+        reachableMember(clusterNode, "member-3", DEFAULT_DATACENTER);
+
+        registerCandidates(clusterNode, ENTITY_1, "member-2", "member-3");
         verifyListenerState(listener, ENTITY_1, true, true, false);
 
         unregisterCandidates(clusterNode, ENTITY_1, "member-1");
         verifyListenerState(listener, ENTITY_1, true, true, false);
 
         unregisterCandidates(clusterNode, ENTITY_1, "member-1");
@@ -59,6 +62,9 @@ public class SingleNodeTest extends AbstractNativeEosTest {
         registerCandidates(clusterNode, ENTITY_1, "member-1", "member-2", "member-3");
         waitUntillOwnerPresent(clusterNode, ENTITY_1);
 
         registerCandidates(clusterNode, ENTITY_1, "member-1", "member-2", "member-3");
         waitUntillOwnerPresent(clusterNode, ENTITY_1);
 
+        reachableMember(clusterNode, "member-2", DEFAULT_DATACENTER);
+        reachableMember(clusterNode, "member-3", DEFAULT_DATACENTER);
+
         final MockEntityOwnershipListener listener = registerListener(clusterNode, ENTITY_1);
         verifyListenerState(listener, ENTITY_1, true, true, false);
 
         final MockEntityOwnershipListener listener = registerListener(clusterNode, ENTITY_1);
         verifyListenerState(listener, ENTITY_1, true, true, false);
 
@@ -71,6 +77,9 @@ public class SingleNodeTest extends AbstractNativeEosTest {
         registerCandidates(clusterNode, ENTITY_1, "member-1", "member-2", "member-3");
         waitUntillOwnerPresent(clusterNode, ENTITY_1);
 
         registerCandidates(clusterNode, ENTITY_1, "member-1", "member-2", "member-3");
         waitUntillOwnerPresent(clusterNode, ENTITY_1);
 
+        reachableMember(clusterNode, "member-2", DEFAULT_DATACENTER);
+        reachableMember(clusterNode, "member-3", DEFAULT_DATACENTER);
+
         final MockEntityOwnershipListener listener1 = registerListener(clusterNode, ENTITY_1);
         final MockEntityOwnershipListener listener2 = registerListener(clusterNode, ENTITY_2);
 
         final MockEntityOwnershipListener listener1 = registerListener(clusterNode, ENTITY_1);
         final MockEntityOwnershipListener listener2 = registerListener(clusterNode, ENTITY_2);
 
index 871bc005cc8531a5a8fac94d9d667b8231dec034..71fffe12baf65854e43fc279db8a30836651da1d 100644 (file)
@@ -57,7 +57,6 @@ public class ThreeNodeReachabilityTest extends AbstractNativeEosTest {
         ActorTestKit.shutdown(node1.getActorSystem(), Duration.ofSeconds(20));
         ActorTestKit.shutdown(node2.getActorSystem(), Duration.ofSeconds(20));
 
         ActorTestKit.shutdown(node1.getActorSystem(), Duration.ofSeconds(20));
         ActorTestKit.shutdown(node2.getActorSystem(), Duration.ofSeconds(20));
 
-
         if (node3 != null) {
             ActorTestKit.shutdown(node3.getActorSystem(), Duration.ofSeconds(20));
         }
         if (node3 != null) {
             ActorTestKit.shutdown(node3.getActorSystem(), Duration.ofSeconds(20));
         }
@@ -142,7 +141,7 @@ public class ThreeNodeReachabilityTest extends AbstractNativeEosTest {
         verifyListenerState(secondEntityListener2, ENTITY_2, true, true, false);
         verifyListenerState(secondEntityListener3, ENTITY_2, true, false, false);
 
         verifyListenerState(secondEntityListener2, ENTITY_2, true, true, false);
         verifyListenerState(secondEntityListener3, ENTITY_2, true, false, false);
 
-        unreachableMember(node1, "member-2");
+        unreachableMember(node1, "member-2", DEFAULT_DATACENTER);
 
         verifyListenerState(firstEntityListener1, ENTITY_1, true, true, false);
         verifyListenerState(firstEntityListener2, ENTITY_1, true, false, true);
 
         verifyListenerState(firstEntityListener1, ENTITY_1, true, true, false);
         verifyListenerState(firstEntityListener2, ENTITY_1, true, false, true);
@@ -152,14 +151,14 @@ public class ThreeNodeReachabilityTest extends AbstractNativeEosTest {
         verifyListenerState(secondEntityListener2, ENTITY_2, true, false, true);
         verifyListenerState(secondEntityListener3, ENTITY_2, true, false, false);
 
         verifyListenerState(secondEntityListener2, ENTITY_2, true, false, true);
         verifyListenerState(secondEntityListener3, ENTITY_2, true, false, false);
 
-        unreachableMember(node1, "member-3");
+        unreachableMember(node1, "member-3", DEFAULT_DATACENTER);
 
         verifyListenerState(firstEntityListener1, ENTITY_1, true, true, false);
         verifyListenerState(firstEntityListener2, ENTITY_1, true, false, true);
         verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false);
 
 
         verifyListenerState(firstEntityListener1, ENTITY_1, true, true, false);
         verifyListenerState(firstEntityListener2, ENTITY_1, true, false, true);
         verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false);
 
-        unregisterCandidates(node1, ENTITY_1, "member-1");
-        unregisterCandidates(node1, ENTITY_2, "member-1");
+        unregisterCandidates(node1, ENTITY_1, "member-1", DEFAULT_DATACENTER);
+        unregisterCandidates(node1, ENTITY_2, "member-1", DEFAULT_DATACENTER);
 
         verifyListenerState(firstEntityListener1, ENTITY_1, false, false, true);
         verifyListenerState(firstEntityListener2, ENTITY_1, false, false, false);
 
         verifyListenerState(firstEntityListener1, ENTITY_1, false, false, true);
         verifyListenerState(firstEntityListener2, ENTITY_1, false, false, false);
@@ -169,7 +168,7 @@ public class ThreeNodeReachabilityTest extends AbstractNativeEosTest {
         verifyListenerState(secondEntityListener2, ENTITY_2, false, false, false);
         verifyListenerState(secondEntityListener3, ENTITY_2, false, false, false);
 
         verifyListenerState(secondEntityListener2, ENTITY_2, false, false, false);
         verifyListenerState(secondEntityListener3, ENTITY_2, false, false, false);
 
-        reachableMember(node1, "member-2");
+        reachableMember(node1, "member-2", DEFAULT_DATACENTER);
         verifyListenerState(firstEntityListener1, ENTITY_1, true, false, false);
         verifyListenerState(firstEntityListener2, ENTITY_1, true, true, false);
         verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false);
         verifyListenerState(firstEntityListener1, ENTITY_1, true, false, false);
         verifyListenerState(firstEntityListener2, ENTITY_1, true, true, false);
         verifyListenerState(firstEntityListener3, ENTITY_1, true, false, false);
index 61747622d1c8343e13762e48f6aa0bb47e61a194..8a088e3fdbcef8c48ffb115b8d154aa23cf07f4f 100644 (file)
@@ -43,29 +43,29 @@ public class OwnerSupervisorTest extends AbstractNativeEosTest {
 
         final ClusterNode node = startup(2550, Collections.singletonList("member-1"));
         try {
 
         final ClusterNode node = startup(2550, Collections.singletonList("member-1"));
         try {
-            reachableMember(node, "member-2");
-            reachableMember(node, "member-3");
+            reachableMember(node, "member-2", DEFAULT_DATACENTER);
+            reachableMember(node, "member-3", DEFAULT_DATACENTER);
             registerCandidates(node, ENTITY_1, "member-1", "member-2", "member-3");
 
             final MockEntityOwnershipListener listener = registerListener(node, ENTITY_1);
             verifyListenerState(listener, ENTITY_1,true, true, false);
 
             registerCandidates(node, ENTITY_1, "member-1", "member-2", "member-3");
 
             final MockEntityOwnershipListener listener = registerListener(node, ENTITY_1);
             verifyListenerState(listener, ENTITY_1,true, true, false);
 
-            unreachableMember(node, "member-1");
+            unreachableMember(node, "member-1", DEFAULT_DATACENTER);
             verifyListenerState(listener, ENTITY_1, true, false, true);
 
             verifyListenerState(listener, ENTITY_1, true, false, true);
 
-            unreachableMember(node, "member-2");
+            unreachableMember(node, "member-2", DEFAULT_DATACENTER);
             verifyListenerState(listener, ENTITY_1, true, false, false);
 
             verifyListenerState(listener, ENTITY_1, true, false, false);
 
-            unreachableMember(node, "member-3");
+            unreachableMember(node, "member-3", DEFAULT_DATACENTER);
             verifyListenerState(listener, ENTITY_1, false, false, false);
 
             verifyListenerState(listener, ENTITY_1, false, false, false);
 
-            reachableMember(node, "member-2");
+            reachableMember(node, "member-2", DEFAULT_DATACENTER);
             verifyListenerState(listener, ENTITY_1, true, false, false);
 
             // no notification here as member-2 is already the owner
             verifyListenerState(listener, ENTITY_1, true, false, false);
 
             // no notification here as member-2 is already the owner
-            reachableMember(node, "member-1");
+            reachableMember(node, "member-1", DEFAULT_DATACENTER);
 
 
-            unreachableMember(node, "member-2");
+            unreachableMember(node, "member-2", DEFAULT_DATACENTER);
             verifyListenerState(listener, ENTITY_1,true, true, false);
         } finally {
             ActorTestKit.shutdown(node.getActorSystem());
             verifyListenerState(listener, ENTITY_1,true, true, false);
         } finally {
             ActorTestKit.shutdown(node.getActorSystem());
@@ -96,7 +96,7 @@ public class OwnerSupervisorTest extends AbstractNativeEosTest {
             // this one could not be assigned during init as we dont have member-2 thats reachable
             verifyListenerState(listener2, ENTITY_2, false, false, false);
 
             // this one could not be assigned during init as we dont have member-2 thats reachable
             verifyListenerState(listener2, ENTITY_2, false, false, false);
 
-            reachableMember(node, "member-2");
+            reachableMember(node, "member-2", DEFAULT_DATACENTER);
             verifyListenerState(listener2, ENTITY_2, true, false, false);
         } finally {
             ActorTestKit.shutdown(node.getActorSystem());
             verifyListenerState(listener2, ENTITY_2, true, false, false);
         } finally {
             ActorTestKit.shutdown(node.getActorSystem());