Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / test / java / org / opendaylight / controller / eos / akka / AbstractNativeEosTest.java
index 201d6c659be329a1fa73e135cd1cce01327ffc73..6adba42c098d8e65dba74a735cc88e821884c940 100644 (file)
@@ -19,6 +19,8 @@ import akka.actor.typed.javadsl.AskPattern;
 import akka.actor.typed.javadsl.Behaviors;
 import akka.cluster.ddata.LWWRegister;
 import akka.cluster.ddata.LWWRegisterKey;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
 import akka.cluster.ddata.typed.javadsl.DistributedData;
 import akka.cluster.ddata.typed.javadsl.Replicator;
 import com.typesafe.config.Config;
@@ -29,6 +31,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -42,13 +45,20 @@ import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateD
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
+import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
 import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.EntityOwnerChanged;
 import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
+import org.opendaylight.mdsal.binding.dom.codec.impl.BindingCodecContext;
+import org.opendaylight.mdsal.binding.generator.impl.DefaultBindingRuntimeGenerator;
+import org.opendaylight.mdsal.binding.runtime.api.BindingRuntimeGenerator;
+import org.opendaylight.mdsal.binding.runtime.spi.BindingRuntimeHelpers;
+import org.opendaylight.mdsal.eos.common.api.EntityOwnershipStateChange;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
-import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,12 +85,35 @@ public abstract class AbstractNativeEosTest {
                     "akka://ClusterSystem@127.0.0.1:2552",
                     "akka://ClusterSystem@127.0.0.1:2553");
 
+    private static final BindingRuntimeGenerator BINDING_RUNTIME_GENERATOR = new DefaultBindingRuntimeGenerator();
+
+    protected static BindingCodecContext CODEC_CONTEXT
+            = new BindingCodecContext(BindingRuntimeHelpers.createRuntimeContext());
+
     private static final String REMOTE_PROTOCOL = "akka";
     private static final String PORT_PARAM = "akka.remote.artery.canonical.port";
     private static final String ROLE_PARAM = "akka.cluster.roles";
     private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes";
     private static final String DATA_CENTER_PARAM = "akka.cluster.multi-data-center.self-data-center";
 
+    protected static MockNativeEntityOwnershipService startupNativeService(final int port, final List<String> roles,
+                                                                           final List<String> seedNodes)
+            throws ExecutionException, InterruptedException {
+        final Map<String, Object> overrides = new HashMap<>();
+        overrides.put(PORT_PARAM, port);
+        overrides.put(ROLE_PARAM, roles);
+        if (!seedNodes.isEmpty()) {
+            overrides.put(SEED_NODES_PARAM, seedNodes);
+        }
+
+        final Config config = ConfigFactory.parseMap(overrides)
+                .withFallback(ConfigFactory.load());
+
+        // Create a classic Akka system since thats what we will have in osgi
+        final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
+
+        return new MockNativeEntityOwnershipService(system);
+    }
 
     protected static ClusterNode startupRemote(final int port, final List<String> roles)
             throws ExecutionException, InterruptedException {
@@ -134,34 +167,55 @@ public abstract class AbstractNativeEosTest {
     protected static ClusterNode startupWithDatacenter(final int port, final List<String> roles,
                                                        final List<String> seedNodes, final String dataCenter)
             throws ExecutionException, InterruptedException {
+        final akka.actor.ActorSystem system = startupActorSystem(port, roles, seedNodes, dataCenter);
+        final ActorRef<BootstrapCommand> eosBootstrap =
+                Adapter.spawn(system, EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec()), "EOSBootstrap");
+
+        final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
+                GetRunningContext::new,
+                Duration.ofSeconds(5),
+                Adapter.toTyped(system.scheduler()));
+        final RunningContext runningContext = ask.toCompletableFuture().get();
+
+        return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
+                runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
+    }
+
+    protected static akka.actor.ActorSystem startupActorSystem(final int port, final List<String> roles,
+                                                               final List<String> seedNodes) {
         final Map<String, Object> overrides = new HashMap<>();
         overrides.put(PORT_PARAM, port);
         overrides.put(ROLE_PARAM, roles);
         if (!seedNodes.isEmpty()) {
             overrides.put(SEED_NODES_PARAM, seedNodes);
         }
-        overrides.put(DATA_CENTER_PARAM, dataCenter);
 
         final Config config = ConfigFactory.parseMap(overrides)
                 .withFallback(ConfigFactory.load());
 
         // Create a classic Akka system since thats what we will have in osgi
-        final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
-        final ActorRef<BootstrapCommand> eosBootstrap =
-                Adapter.spawn(system, EOSMain.create(), "EOSBootstrap");
+        return akka.actor.ActorSystem.create("ClusterSystem", config);
+    }
 
-        final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
-                GetRunningContext::new,
-                Duration.ofSeconds(5),
-                Adapter.toTyped(system.scheduler()));
-        final RunningContext runningContext = ask.toCompletableFuture().get();
+    protected static akka.actor.ActorSystem startupActorSystem(final int port, final List<String> roles,
+                                                               final List<String> seedNodes, final String dataCenter) {
+        final Map<String, Object> overrides = new HashMap<>();
+        overrides.put(PORT_PARAM, port);
+        overrides.put(ROLE_PARAM, roles);
+        if (!seedNodes.isEmpty()) {
+            overrides.put(SEED_NODES_PARAM, seedNodes);
+        }
+        overrides.put(DATA_CENTER_PARAM, dataCenter);
 
-        return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
-                runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
+        final Config config = ConfigFactory.parseMap(overrides)
+                .withFallback(ConfigFactory.load());
+
+        // Create a classic Akka system since thats what we will have in osgi
+        return akka.actor.ActorSystem.create("ClusterSystem", config);
     }
 
     private static Behavior<BootstrapCommand> rootBehavior() {
-        return Behaviors.setup(context -> EOSMain.create());
+        return Behaviors.setup(context -> EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec()));
     }
 
     protected static void registerCandidates(final ClusterNode node, final DOMEntity entity, final String... members) {
@@ -213,7 +267,7 @@ public abstract class AbstractNativeEosTest {
     }
 
     protected static void waitUntillOwnerPresent(final ClusterNode clusterNode, final DOMEntity entity) {
-        await().until(() -> {
+        await().atMost(Duration.ofSeconds(15)).until(() -> {
             final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem());
             final CompletionStage<Replicator.GetResponse<LWWRegister<String>>> ask =
                     AskPattern.ask(distributedData.replicator(),
@@ -234,31 +288,77 @@ public abstract class AbstractNativeEosTest {
         });
     }
 
-    protected static void activateDatacenter(final ClusterNode clusterNode) {
-        clusterNode.getOwnerSupervisor().tell(ActivateDataCenter.INSTANCE);
+    protected static void waitUntillCandidatePresent(final ClusterNode clusterNode, final DOMEntity entity,
+                                                     final String candidate) {
+        await().atMost(Duration.ofSeconds(15)).until(() -> {
+            final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem());
+
+            final CompletionStage<Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>>> ask =
+                    AskPattern.ask(distributedData.replicator(),
+                            replyTo -> new Replicator.Get<>(
+                                    CandidateRegistry.KEY, Replicator.readLocal(), replyTo),
+                            Duration.ofSeconds(5),
+                            clusterNode.getActorSystem().scheduler());
+
+            final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response =
+                    ask.toCompletableFuture().get(5, TimeUnit.SECONDS);
+
+            if (response instanceof Replicator.GetSuccess) {
+                final Map<DOMEntity, ORSet<String>> entries =
+                        ((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response).dataValue().getEntries();
+
+                return entries.get(entity).contains(candidate);
+
+            }
+            return false;
+        });
+    }
+
+    protected static CompletableFuture<OwnerSupervisorReply> activateDatacenter(final ClusterNode clusterNode) {
+        final CompletionStage<OwnerSupervisorReply> ask =
+                AskPattern.ask(clusterNode.getOwnerSupervisor(),
+                        ActivateDataCenter::new,
+                        Duration.ofSeconds(20),
+                        clusterNode.actorSystem.scheduler());
+        return ask.toCompletableFuture();
     }
 
-    protected static void deactivateDatacenter(final ClusterNode clusterNode) {
-        clusterNode.getOwnerSupervisor().tell(DeactivateDataCenter.INSTANCE);
+    protected static CompletableFuture<OwnerSupervisorReply> deactivateDatacenter(final ClusterNode clusterNode) {
+        final CompletionStage<OwnerSupervisorReply> ask =
+                AskPattern.ask(clusterNode.getOwnerSupervisor(),
+                        DeactivateDataCenter::new,
+                        Duration.ofSeconds(20),
+                        clusterNode.actorSystem.scheduler());
+        return ask.toCompletableFuture();
     }
 
     protected static void verifyListenerState(final MockEntityOwnershipListener listener, final DOMEntity entity,
                                               final boolean hasOwner, final boolean isOwner, final boolean wasOwner) {
         await().until(() -> !listener.getChanges().isEmpty());
 
-        await().untilAsserted(() -> {
-            final List<DOMEntityOwnershipChange> changes = listener.getChanges();
-            final DOMEntityOwnershipChange domEntityOwnershipChange = listener.getChanges().get(changes.size() - 1);
-            assertEquals(entity, domEntityOwnershipChange.getEntity());
+        await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
+            final var changes = listener.getChanges();
+            final var domEntityOwnershipChange = listener.getChanges().get(changes.size() - 1);
+            assertEquals(entity, domEntityOwnershipChange.entity());
 
-            assertEquals(hasOwner, domEntityOwnershipChange.getState().hasOwner());
-            assertEquals(isOwner, domEntityOwnershipChange.getState().isOwner());
-            assertEquals(wasOwner, domEntityOwnershipChange.getState().wasOwner());
+            assertEquals(hasOwner, domEntityOwnershipChange.change().hasOwner());
+            assertEquals(isOwner, domEntityOwnershipChange.change().isOwner());
+            assertEquals(wasOwner, domEntityOwnershipChange.change().wasOwner());
         });
     }
 
     protected static void verifyNoNotifications(final MockEntityOwnershipListener listener) {
-        await().pollDelay(2, TimeUnit.SECONDS).until(() -> listener.getChanges().isEmpty());
+        verifyNoNotifications(listener, 2);
+    }
+
+    protected static void verifyNoNotifications(final MockEntityOwnershipListener listener, final long delaySeconds) {
+        await().pollDelay(delaySeconds, TimeUnit.SECONDS).until(() -> listener.getChanges().isEmpty());
+    }
+
+    protected static void verifyNoAdditionalNotifications(
+            final MockEntityOwnershipListener listener, final long delaySeconds) {
+        listener.resetListener();
+        verifyNoNotifications(listener, delaySeconds);
     }
 
     protected static final class ClusterNode {
@@ -316,11 +416,9 @@ public abstract class AbstractNativeEosTest {
     }
 
     protected static final class MockEntityOwnershipListener implements DOMEntityOwnershipListener {
-
-        private final Logger log;
-
-        private final List<DOMEntityOwnershipChange> changes = new ArrayList<>();
+        private final List<EntityOwnerChanged> changes = new ArrayList<>();
         private final String member;
+        private final Logger log;
 
         public MockEntityOwnershipListener(final String member) {
             log = LoggerFactory.getLogger("EOS-listener-" + member);
@@ -328,14 +426,42 @@ public abstract class AbstractNativeEosTest {
         }
 
         @Override
-        public void ownershipChanged(final DOMEntityOwnershipChange ownershipChange) {
-            log.info("{} Received ownershipCHanged: {}", member, ownershipChange);
+        public void ownershipChanged(final DOMEntity entity, final EntityOwnershipStateChange change,
+                final boolean inJeopardy) {
+            final var changed = new EntityOwnerChanged(entity, change, inJeopardy);
+            log.info("{} Received ownershipCHanged: {}", member, changed);
             log.info("{} changes: {}", member, changes.size());
-            changes.add(ownershipChange);
+            changes.add(changed);
         }
 
-        public List<DOMEntityOwnershipChange> getChanges() {
+        public List<EntityOwnerChanged> getChanges() {
             return changes;
         }
+
+        public void resetListener() {
+            changes.clear();
+        }
+    }
+
+    protected static final class MockNativeEntityOwnershipService extends AkkaEntityOwnershipService {
+        private final ActorSystem classicActorSystem;
+
+        protected MockNativeEntityOwnershipService(final ActorSystem classicActorSystem)
+                throws ExecutionException, InterruptedException {
+            super(classicActorSystem, CODEC_CONTEXT);
+            this.classicActorSystem = classicActorSystem;
+        }
+
+        protected void reachableMember(final String... role) {
+            AbstractNativeEosTest.reachableMember(ownerSupervisor, role);
+        }
+
+        public void unreachableMember(final String... role) {
+            AbstractNativeEosTest.unreachableMember(ownerSupervisor, role);
+        }
+
+        public ActorSystem getActorSystem() {
+            return classicActorSystem;
+        }
     }
 }