Tune eos gossip/notification intervals
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / test / java / org / opendaylight / controller / eos / akka / AbstractNativeEosTest.java
index 8d10119c796acaa9db370af38f951e116c3f681b..5af0e2afdbffffa48aea202e4b2827ae998db2f0 100644 (file)
@@ -29,6 +29,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -37,9 +38,12 @@ import org.opendaylight.controller.eos.akka.bootstrap.EOSMain;
 import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand;
 import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext;
 import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
@@ -56,6 +60,8 @@ public abstract class AbstractNativeEosTest {
     public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1");
     public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2");
 
+    protected static final String DEFAULT_DATACENTER = "dc-default";
+
     protected static final List<String> TWO_NODE_SEED_NODES =
             List.of("akka://ClusterSystem@127.0.0.1:2550",
                     "akka://ClusterSystem@127.0.0.1:2551");
@@ -65,11 +71,36 @@ public abstract class AbstractNativeEosTest {
                     "akka://ClusterSystem@127.0.0.1:2551",
                     "akka://ClusterSystem@127.0.0.1:2552");
 
+    protected static final List<String> DATACENTER_SEED_NODES =
+            List.of("akka://ClusterSystem@127.0.0.1:2550",
+                    "akka://ClusterSystem@127.0.0.1:2551",
+                    "akka://ClusterSystem@127.0.0.1:2552",
+                    "akka://ClusterSystem@127.0.0.1:2553");
+
     private static final String REMOTE_PROTOCOL = "akka";
     private static final String PORT_PARAM = "akka.remote.artery.canonical.port";
     private static final String ROLE_PARAM = "akka.cluster.roles";
     private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes";
+    private static final String DATA_CENTER_PARAM = "akka.cluster.multi-data-center.self-data-center";
+
+    protected static MockNativeEntityOwnershipService startupNativeService(final int port, List<String> roles,
+                                                                           final List<String> seedNodes)
+            throws ExecutionException, InterruptedException {
+        final Map<String, Object> overrides = new HashMap<>();
+        overrides.put(PORT_PARAM, port);
+        overrides.put(ROLE_PARAM, roles);
+        if (!seedNodes.isEmpty()) {
+            overrides.put(SEED_NODES_PARAM, seedNodes);
+        }
+
+        final Config config = ConfigFactory.parseMap(overrides)
+                .withFallback(ConfigFactory.load());
+
+        // Create a classic Akka system since thats what we will have in osgi
+        final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
 
+        return new MockNativeEntityOwnershipService(system);
+    }
 
     protected static ClusterNode startupRemote(final int port, final List<String> roles)
             throws ExecutionException, InterruptedException {
@@ -120,6 +151,35 @@ public abstract class AbstractNativeEosTest {
                 runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
     }
 
+    protected static ClusterNode startupWithDatacenter(final int port, final List<String> roles,
+                                                       final List<String> seedNodes, final String dataCenter)
+            throws ExecutionException, InterruptedException {
+        final Map<String, Object> overrides = new HashMap<>();
+        overrides.put(PORT_PARAM, port);
+        overrides.put(ROLE_PARAM, roles);
+        if (!seedNodes.isEmpty()) {
+            overrides.put(SEED_NODES_PARAM, seedNodes);
+        }
+        overrides.put(DATA_CENTER_PARAM, dataCenter);
+
+        final Config config = ConfigFactory.parseMap(overrides)
+                .withFallback(ConfigFactory.load());
+
+        // Create a classic Akka system since thats what we will have in osgi
+        final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
+        final ActorRef<BootstrapCommand> eosBootstrap =
+                Adapter.spawn(system, EOSMain.create(), "EOSBootstrap");
+
+        final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
+                GetRunningContext::new,
+                Duration.ofSeconds(5),
+                Adapter.toTyped(system.scheduler()));
+        final RunningContext runningContext = ask.toCompletableFuture().get();
+
+        return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
+                runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
+    }
+
     private static Behavior<BootstrapCommand> rootBehavior() {
         return Behaviors.setup(context -> EOSMain.create());
     }
@@ -152,20 +212,22 @@ public abstract class AbstractNativeEosTest {
         return listener;
     }
 
-    protected static void reachableMember(final ClusterNode node, final String role) {
+    protected static void reachableMember(final ClusterNode node, final String... role) {
         reachableMember(node.getOwnerSupervisor(), role);
     }
 
-    protected static void reachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor, final String role) {
+    protected static void reachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
+                                          final String... role) {
         ownerSupervisor.tell(new MemberReachableEvent(
                 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
     }
 
-    protected static void unreachableMember(final ClusterNode node, final String role) {
+    protected static void unreachableMember(final ClusterNode node, final String... role) {
         unreachableMember(node.getOwnerSupervisor(), role);
     }
 
-    protected static void unreachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor, final String role) {
+    protected static void unreachableMember(final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
+                                            final String... role) {
         ownerSupervisor.tell(new MemberUnreachableEvent(
                 new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role)));
     }
@@ -192,6 +254,24 @@ public abstract class AbstractNativeEosTest {
         });
     }
 
+    protected static CompletableFuture<OwnerSupervisorReply> activateDatacenter(final ClusterNode clusterNode) {
+        final CompletionStage<OwnerSupervisorReply> ask =
+                AskPattern.ask(clusterNode.getOwnerSupervisor(),
+                        ActivateDataCenter::new,
+                        Duration.ofSeconds(20),
+                        clusterNode.actorSystem.scheduler());
+        return ask.toCompletableFuture();
+    }
+
+    protected static CompletableFuture<OwnerSupervisorReply> deactivateDatacenter(final ClusterNode clusterNode) {
+        final CompletionStage<OwnerSupervisorReply> ask =
+                AskPattern.ask(clusterNode.getOwnerSupervisor(),
+                        DeactivateDataCenter::new,
+                        Duration.ofSeconds(20),
+                        clusterNode.actorSystem.scheduler());
+        return ask.toCompletableFuture();
+    }
+
     protected static void verifyListenerState(final MockEntityOwnershipListener listener, final DOMEntity entity,
                                               final boolean hasOwner, final boolean isOwner, final boolean wasOwner) {
         await().until(() -> !listener.getChanges().isEmpty());
@@ -208,7 +288,17 @@ public abstract class AbstractNativeEosTest {
     }
 
     protected static void verifyNoNotifications(final MockEntityOwnershipListener listener) {
-        await().pollDelay(2, TimeUnit.SECONDS).until(() -> listener.getChanges().isEmpty());
+        verifyNoNotifications(listener, 2);
+    }
+
+    protected static void verifyNoNotifications(final MockEntityOwnershipListener listener, long delaySeconds) {
+        await().pollDelay(delaySeconds, TimeUnit.SECONDS).until(() -> listener.getChanges().isEmpty());
+    }
+
+    protected static void verifyNoAdditionalNotifications(
+            final MockEntityOwnershipListener listener, long delaySeconds) {
+        listener.resetListener();
+        verifyNoNotifications(listener, delaySeconds);
     }
 
     protected static final class ClusterNode {
@@ -267,13 +357,13 @@ public abstract class AbstractNativeEosTest {
 
     protected static final class MockEntityOwnershipListener implements DOMEntityOwnershipListener {
 
-        private final Logger log = LoggerFactory.getLogger(MockEntityOwnershipListener.class);
+        private final Logger log;
 
         private final List<DOMEntityOwnershipChange> changes = new ArrayList<>();
         private final String member;
 
         public MockEntityOwnershipListener(final String member) {
-
+            log = LoggerFactory.getLogger("EOS-listener-" + member);
             this.member = member;
         }
 
@@ -287,5 +377,31 @@ public abstract class AbstractNativeEosTest {
         public List<DOMEntityOwnershipChange> getChanges() {
             return changes;
         }
+
+        public void resetListener() {
+            changes.clear();
+        }
+    }
+
+    protected static final class MockNativeEntityOwnershipService extends AkkaEntityOwnershipService {
+        private ActorSystem classicActorSystem;
+
+        protected MockNativeEntityOwnershipService(ActorSystem classicActorSystem)
+                throws ExecutionException, InterruptedException {
+            super(classicActorSystem);
+            this.classicActorSystem = classicActorSystem;
+        }
+
+        protected void reachableMember(final String... role) {
+            AbstractNativeEosTest.reachableMember(ownerSupervisor, role);
+        }
+
+        public void unreachableMember(final String... role) {
+            AbstractNativeEosTest.unreachableMember(ownerSupervisor, role);
+        }
+
+        public ActorSystem getActorSystem() {
+            return classicActorSystem;
+        }
     }
 }