Add integration test with cluster-singleton 60/96160/15
authorTomas Cere <tomas.cere@pantheon.tech>
Fri, 7 May 2021 12:04:52 +0000 (14:04 +0200)
committerRobert Varga <nite@hq.sk>
Wed, 30 Jun 2021 08:20:02 +0000 (08:20 +0000)
JIRA: CONTROLLER-1982
Change-Id: I75aec3a46da42c38287970590006655a2d15b926
Signed-off-by: Tomas Cere <tomas.cere@pantheon.tech>
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipService.java
opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/EntityTypeListenerActor.java
opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java
opendaylight/md-sal/eos-dom-akka/src/test/resources/simplelogger.properties
opendaylight/md-sal/sal-distributed-eos-native/src/test/java/org/opendaylight/controller/cluster/akka/eos/service/ClusterSingletonIntegrationTest.java [new file with mode: 0644]

index 28ada5623087796c3682131a5d09cc657af1c3bc..ffce30dafad603d316b2795f8879a1ca3b1c12cc 100644 (file)
@@ -66,7 +66,7 @@ import org.slf4j.LoggerFactory;
  */
 @Singleton
 @Component(immediate = true, service = { DOMEntityOwnershipService.class, NativeEosService.class })
-public final class AkkaEntityOwnershipService implements DOMEntityOwnershipService, NativeEosService, AutoCloseable {
+public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, NativeEosService, AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(AkkaEntityOwnershipService.class);
     private static final String DATACENTER_PREFIX = "dc";
     private static final Duration DATACENTER_OP_TIMEOUT = Duration.ofSeconds(20);
@@ -81,10 +81,11 @@ public final class AkkaEntityOwnershipService implements DOMEntityOwnershipServi
     private final ActorRef<CandidateRegistryCommand> candidateRegistry;
     private final ActorRef<TypeListenerRegistryCommand> listenerRegistry;
     private final ActorRef<StateCheckerCommand> ownerStateChecker;
-    private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
+    protected final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
 
     @VisibleForTesting
-    AkkaEntityOwnershipService(final ActorSystem actorSystem) throws ExecutionException, InterruptedException {
+    protected AkkaEntityOwnershipService(final ActorSystem actorSystem)
+            throws ExecutionException, InterruptedException {
         final var typedActorSystem = Adapter.toTyped(actorSystem);
         scheduler = typedActorSystem.scheduler();
 
index 81772cf1551926f554c96c8cb79b4040cd051633..17391fac91265a7da322b385a4bd852c4e20b78e 100644 (file)
@@ -107,7 +107,7 @@ public class EntityTypeListenerActor extends AbstractBehavior<TypeListenerComman
     }
 
     private Behavior<TypeListenerCommand> onOwnerChanged(final EntityOwnerChanged rsp) {
-        LOG.debug("Entity-type: {} listener, owner change: {}", entityType, rsp);
+        LOG.debug("{} : Entity-type: {} listener, owner change: {}", localMember, entityType, rsp);
 
         listener.ownershipChanged(rsp.getOwnershipChange());
         return this;
index ee1782ad55566777087f45130c677e538c6f975b..576a9c1d12f07a1a765118ddcebf4cd3a09e507b 100644 (file)
@@ -83,6 +83,24 @@ public abstract class AbstractNativeEosTest {
     private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes";
     private static final String DATA_CENTER_PARAM = "akka.cluster.multi-data-center.self-data-center";
 
+    protected static MockNativeEntityOwnershipService startupNativeService(final int port, List<String> roles,
+                                                                           final List<String> seedNodes)
+            throws ExecutionException, InterruptedException {
+        final Map<String, Object> overrides = new HashMap<>();
+        overrides.put(PORT_PARAM, port);
+        overrides.put(ROLE_PARAM, roles);
+        if (!seedNodes.isEmpty()) {
+            overrides.put(SEED_NODES_PARAM, seedNodes);
+        }
+
+        final Config config = ConfigFactory.parseMap(overrides)
+                .withFallback(ConfigFactory.load());
+
+        // Create a classic Akka system since thats what we will have in osgi
+        final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
+
+        return new MockNativeEntityOwnershipService(system);
+    }
 
     protected static ClusterNode startupRemote(final int port, final List<String> roles)
             throws ExecutionException, InterruptedException {
@@ -350,4 +368,26 @@ public abstract class AbstractNativeEosTest {
             return changes;
         }
     }
+
+    protected static final class MockNativeEntityOwnershipService extends AkkaEntityOwnershipService {
+        private ActorSystem classicActorSystem;
+
+        protected MockNativeEntityOwnershipService(ActorSystem classicActorSystem)
+                throws ExecutionException, InterruptedException {
+            super(classicActorSystem);
+            this.classicActorSystem = classicActorSystem;
+        }
+
+        protected void reachableMember(final String... role) {
+            AbstractNativeEosTest.reachableMember(ownerSupervisor, role);
+        }
+
+        public void unreachableMember(final String... role) {
+            AbstractNativeEosTest.unreachableMember(ownerSupervisor, role);
+        }
+
+        public ActorSystem getActorSystem() {
+            return classicActorSystem;
+        }
+    }
 }
index 942eeee6100da4246ed4a9f1af6b1704da0c08ef..5afb5e45078a4dbf82f213a970cbb926409af321 100644 (file)
@@ -5,3 +5,4 @@ org.slf4j.simpleLogger.logFile=System.out
 org.slf4j.simpleLogger.showShortLogName=true
 org.slf4j.simpleLogger.levelInBrackets=true
 org.slf4j.simpleLogger.log.org.opendaylight.controller.eos.akka=debug
+org.slf4j.simpleLogger.log.org.opendaylight.mdsal.singleton=debug
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-eos-native/src/test/java/org/opendaylight/controller/cluster/akka/eos/service/ClusterSingletonIntegrationTest.java b/opendaylight/md-sal/sal-distributed-eos-native/src/test/java/org/opendaylight/controller/cluster/akka/eos/service/ClusterSingletonIntegrationTest.java
new file mode 100644 (file)
index 0000000..bef4304
--- /dev/null
@@ -0,0 +1,213 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.akka.eos.service;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import akka.actor.testkit.typed.javadsl.ActorTestKit;
+import akka.actor.typed.javadsl.Adapter;
+import akka.cluster.Member;
+import akka.cluster.MemberStatus;
+import akka.cluster.typed.Cluster;
+import akka.testkit.javadsl.TestKit;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.akka.eos.AbstractNativeEosTest;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
+import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
+import org.opendaylight.mdsal.singleton.dom.impl.DOMClusterSingletonServiceProviderImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClusterSingletonIntegrationTest extends AbstractNativeEosTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonIntegrationTest.class);
+
+    private MockNativeEntityOwnershipService node1;
+    private MockNativeEntityOwnershipService node2;
+    private MockNativeEntityOwnershipService node3;
+
+    private MockSingletonService singletonNode1;
+    private MockSingletonService singletonNode2;
+    private MockSingletonService singletonNode3;
+
+
+    @Before
+    public void setUp() throws Exception {
+        node1 = startupNativeService(2550, List.of("member-1"), THREE_NODE_SEED_NODES);
+        node2 = startupNativeService(2551, List.of("member-2"), THREE_NODE_SEED_NODES);
+        node3 = startupNativeService(2552, List.of("member-3"), THREE_NODE_SEED_NODES);
+
+        singletonNode1 = new MockSingletonService(node1);
+        singletonNode1.initializeProvider();
+
+        singletonNode2 = new MockSingletonService(node2);
+        singletonNode2.initializeProvider();
+
+        singletonNode3 = new MockSingletonService(node3);
+        singletonNode3.initializeProvider();
+
+        waitUntillNodeReady(node3);
+    }
+
+    @After
+    public void tearDown() {
+        ActorTestKit.shutdown(Adapter.toTyped(node1.getActorSystem()), Duration.ofSeconds(20));
+        ActorTestKit.shutdown(Adapter.toTyped(node2.getActorSystem()), Duration.ofSeconds(20));
+        ActorTestKit.shutdown(Adapter.toTyped(node3.getActorSystem()), Duration.ofSeconds(20));
+    }
+
+    @Test
+    public void testSingletonOwnershipNotDropped() {
+        final MockClusterSingletonService service = new MockClusterSingletonService("member-1", "service-1");
+        singletonNode1.registerClusterSingletonService(service);
+
+        verifyServiceActive(service);
+
+        final MockClusterSingletonService service2 = new MockClusterSingletonService("member-2", "service-1");
+        singletonNode2.registerClusterSingletonService(service2);
+
+        verifyServiceInactive(service2, 2);
+    }
+
+    @Test
+    public void testSingletonOwnershipHandoff() {
+        final MockClusterSingletonService service = new MockClusterSingletonService("member-1", "service-1");
+        final ClusterSingletonServiceRegistration registration =
+                singletonNode1.registerClusterSingletonService(service);
+
+        verifyServiceActive(service);
+
+        final MockClusterSingletonService service2 = new MockClusterSingletonService("member-2", "service-1");
+        singletonNode2.registerClusterSingletonService(service2);
+
+        verifyServiceInactive(service2, 2);
+
+        registration.close();
+        verifyServiceInactive(service);
+        verifyServiceActive(service2);
+    }
+
+    @Test
+    public void testSingletonOwnershipHandoffOnNodeShutdown() throws Exception {
+        MockClusterSingletonService service2 = new MockClusterSingletonService("member-2", "service-1");
+        ClusterSingletonServiceRegistration registration2 =
+                singletonNode2.registerClusterSingletonService(service2);
+
+        verifyServiceActive(service2);
+
+        final MockClusterSingletonService service3 = new MockClusterSingletonService("member-3", "service-1");
+        final ClusterSingletonServiceRegistration registration3 =
+                singletonNode3.registerClusterSingletonService(service3);
+
+        verifyServiceInactive(service3, 2);
+
+        LOG.debug("Shutting down node2");
+        TestKit.shutdownActorSystem(node2.getActorSystem());
+        verifyServiceActive(service3);
+
+        node2 = startupNativeService(2551, List.of("member-1"), THREE_NODE_SEED_NODES);
+        singletonNode2 = new MockSingletonService(node2);
+        singletonNode2.initializeProvider();
+
+        waitUntillNodeReady(node2);
+        service2 = new MockClusterSingletonService("member-2", "service-1");
+        singletonNode2.registerClusterSingletonService(service2);
+
+        verifyServiceActive(service3);
+        verifyServiceInactive(service2, 5);
+    }
+
+    private void waitUntillNodeReady(MockNativeEntityOwnershipService node) {
+        // need to wait until all nodes are ready
+        final Cluster cluster = Cluster.get(Adapter.toTyped(node.getActorSystem()));
+        Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> {
+            final List<Member> members = new ArrayList<>();
+            cluster.state().getMembers().forEach(members::add);
+            if (members.size() != 3) {
+                return false;
+            }
+
+            for (final Member member : members) {
+                if (!member.status().equals(MemberStatus.up())) {
+                    return false;
+                }
+            }
+
+            return true;
+        });
+    }
+
+    private static void verifyServiceActive(MockClusterSingletonService service) {
+        await().untilAsserted(() -> assertTrue(service.isActivated()));
+    }
+
+    private static void verifyServiceActive(MockClusterSingletonService service, long delay) {
+        await().pollDelay(delay, TimeUnit.SECONDS).untilAsserted(() -> assertTrue(service.isActivated()));
+    }
+
+    private static void verifyServiceInactive(MockClusterSingletonService service) {
+        await().untilAsserted(() -> assertFalse(service.isActivated()));
+    }
+
+    private static void verifyServiceInactive(MockClusterSingletonService service, long delay) {
+        await().pollDelay(delay, TimeUnit.SECONDS).untilAsserted(() -> assertFalse(service.isActivated()));
+    }
+
+    private static class MockClusterSingletonService implements ClusterSingletonService {
+
+        private final String member;
+        private final ServiceGroupIdentifier identifier;
+        private boolean activated = false;
+
+        MockClusterSingletonService(String member, String identifier) {
+            this.member = member;
+            this.identifier = ServiceGroupIdentifier.create(identifier);
+        }
+
+        @Override
+        public void instantiateServiceInstance() {
+            LOG.debug("{} : Activating service: {}", member, identifier);
+            activated = true;
+        }
+
+        @Override
+        public ListenableFuture<? extends Object> closeServiceInstance() {
+            LOG.debug("{} : Closing service: {}", member, identifier);
+            activated = false;
+            return Futures.immediateFuture(null);
+        }
+
+        @Override
+        public ServiceGroupIdentifier getIdentifier() {
+            return identifier;
+        }
+
+        public boolean isActivated() {
+            return activated;
+        }
+    }
+
+    private static class MockSingletonService extends DOMClusterSingletonServiceProviderImpl {
+        MockSingletonService(DOMEntityOwnershipService entityOwnershipService) {
+            super(entityOwnershipService);
+        }
+    }
+}