From d8deaa4381251e0900a6ba50d1f16bab47ab8491 Mon Sep 17 00:00:00 2001 From: Tomas Cere Date: Fri, 7 May 2021 14:04:52 +0200 Subject: [PATCH] Add integration test with cluster-singleton JIRA: CONTROLLER-1982 Change-Id: I75aec3a46da42c38287970590006655a2d15b926 Signed-off-by: Tomas Cere --- .../eos/akka/AkkaEntityOwnershipService.java | 7 +- .../type/EntityTypeListenerActor.java | 2 +- .../eos/akka/AbstractNativeEosTest.java | 40 ++++ .../test/resources/simplelogger.properties | 1 + .../ClusterSingletonIntegrationTest.java | 213 ++++++++++++++++++ 5 files changed, 259 insertions(+), 4 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-eos-native/src/test/java/org/opendaylight/controller/cluster/akka/eos/service/ClusterSingletonIntegrationTest.java diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipService.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipService.java index 28ada56230..ffce30dafa 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipService.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/AkkaEntityOwnershipService.java @@ -66,7 +66,7 @@ import org.slf4j.LoggerFactory; */ @Singleton @Component(immediate = true, service = { DOMEntityOwnershipService.class, NativeEosService.class }) -public final class AkkaEntityOwnershipService implements DOMEntityOwnershipService, NativeEosService, AutoCloseable { +public class AkkaEntityOwnershipService implements DOMEntityOwnershipService, NativeEosService, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(AkkaEntityOwnershipService.class); private static final String DATACENTER_PREFIX = "dc"; private static final Duration DATACENTER_OP_TIMEOUT = Duration.ofSeconds(20); @@ -81,10 +81,11 @@ public final class AkkaEntityOwnershipService implements DOMEntityOwnershipServi private final ActorRef candidateRegistry; private final ActorRef listenerRegistry; private final ActorRef ownerStateChecker; - private final ActorRef ownerSupervisor; + protected final ActorRef ownerSupervisor; @VisibleForTesting - AkkaEntityOwnershipService(final ActorSystem actorSystem) throws ExecutionException, InterruptedException { + protected AkkaEntityOwnershipService(final ActorSystem actorSystem) + throws ExecutionException, InterruptedException { final var typedActorSystem = Adapter.toTyped(actorSystem); scheduler = typedActorSystem.scheduler(); diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/EntityTypeListenerActor.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/EntityTypeListenerActor.java index 81772cf155..17391fac91 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/EntityTypeListenerActor.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/type/EntityTypeListenerActor.java @@ -107,7 +107,7 @@ public class EntityTypeListenerActor extends AbstractBehavior onOwnerChanged(final EntityOwnerChanged rsp) { - LOG.debug("Entity-type: {} listener, owner change: {}", entityType, rsp); + LOG.debug("{} : Entity-type: {} listener, owner change: {}", localMember, entityType, rsp); listener.ownershipChanged(rsp.getOwnershipChange()); return this; diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java index ee1782ad55..576a9c1d12 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java +++ b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java @@ -83,6 +83,24 @@ public abstract class AbstractNativeEosTest { private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes"; private static final String DATA_CENTER_PARAM = "akka.cluster.multi-data-center.self-data-center"; + protected static MockNativeEntityOwnershipService startupNativeService(final int port, List roles, + final List seedNodes) + throws ExecutionException, InterruptedException { + final Map overrides = new HashMap<>(); + overrides.put(PORT_PARAM, port); + overrides.put(ROLE_PARAM, roles); + if (!seedNodes.isEmpty()) { + overrides.put(SEED_NODES_PARAM, seedNodes); + } + + final Config config = ConfigFactory.parseMap(overrides) + .withFallback(ConfigFactory.load()); + + // Create a classic Akka system since thats what we will have in osgi + final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config); + + return new MockNativeEntityOwnershipService(system); + } protected static ClusterNode startupRemote(final int port, final List roles) throws ExecutionException, InterruptedException { @@ -350,4 +368,26 @@ public abstract class AbstractNativeEosTest { return changes; } } + + protected static final class MockNativeEntityOwnershipService extends AkkaEntityOwnershipService { + private ActorSystem classicActorSystem; + + protected MockNativeEntityOwnershipService(ActorSystem classicActorSystem) + throws ExecutionException, InterruptedException { + super(classicActorSystem); + this.classicActorSystem = classicActorSystem; + } + + protected void reachableMember(final String... role) { + AbstractNativeEosTest.reachableMember(ownerSupervisor, role); + } + + public void unreachableMember(final String... role) { + AbstractNativeEosTest.unreachableMember(ownerSupervisor, role); + } + + public ActorSystem getActorSystem() { + return classicActorSystem; + } + } } diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/resources/simplelogger.properties b/opendaylight/md-sal/eos-dom-akka/src/test/resources/simplelogger.properties index 942eeee610..5afb5e4507 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/test/resources/simplelogger.properties +++ b/opendaylight/md-sal/eos-dom-akka/src/test/resources/simplelogger.properties @@ -5,3 +5,4 @@ org.slf4j.simpleLogger.logFile=System.out org.slf4j.simpleLogger.showShortLogName=true org.slf4j.simpleLogger.levelInBrackets=true org.slf4j.simpleLogger.log.org.opendaylight.controller.eos.akka=debug +org.slf4j.simpleLogger.log.org.opendaylight.mdsal.singleton=debug \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-eos-native/src/test/java/org/opendaylight/controller/cluster/akka/eos/service/ClusterSingletonIntegrationTest.java b/opendaylight/md-sal/sal-distributed-eos-native/src/test/java/org/opendaylight/controller/cluster/akka/eos/service/ClusterSingletonIntegrationTest.java new file mode 100644 index 0000000000..bef4304460 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-eos-native/src/test/java/org/opendaylight/controller/cluster/akka/eos/service/ClusterSingletonIntegrationTest.java @@ -0,0 +1,213 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.akka.eos.service; + +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import akka.actor.testkit.typed.javadsl.ActorTestKit; +import akka.actor.typed.javadsl.Adapter; +import akka.cluster.Member; +import akka.cluster.MemberStatus; +import akka.cluster.typed.Cluster; +import akka.testkit.javadsl.TestKit; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.awaitility.Awaitility; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.controller.cluster.akka.eos.AbstractNativeEosTest; +import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService; +import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService; +import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration; +import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; +import org.opendaylight.mdsal.singleton.dom.impl.DOMClusterSingletonServiceProviderImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ClusterSingletonIntegrationTest extends AbstractNativeEosTest { + + private static final Logger LOG = LoggerFactory.getLogger(ClusterSingletonIntegrationTest.class); + + private MockNativeEntityOwnershipService node1; + private MockNativeEntityOwnershipService node2; + private MockNativeEntityOwnershipService node3; + + private MockSingletonService singletonNode1; + private MockSingletonService singletonNode2; + private MockSingletonService singletonNode3; + + + @Before + public void setUp() throws Exception { + node1 = startupNativeService(2550, List.of("member-1"), THREE_NODE_SEED_NODES); + node2 = startupNativeService(2551, List.of("member-2"), THREE_NODE_SEED_NODES); + node3 = startupNativeService(2552, List.of("member-3"), THREE_NODE_SEED_NODES); + + singletonNode1 = new MockSingletonService(node1); + singletonNode1.initializeProvider(); + + singletonNode2 = new MockSingletonService(node2); + singletonNode2.initializeProvider(); + + singletonNode3 = new MockSingletonService(node3); + singletonNode3.initializeProvider(); + + waitUntillNodeReady(node3); + } + + @After + public void tearDown() { + ActorTestKit.shutdown(Adapter.toTyped(node1.getActorSystem()), Duration.ofSeconds(20)); + ActorTestKit.shutdown(Adapter.toTyped(node2.getActorSystem()), Duration.ofSeconds(20)); + ActorTestKit.shutdown(Adapter.toTyped(node3.getActorSystem()), Duration.ofSeconds(20)); + } + + @Test + public void testSingletonOwnershipNotDropped() { + final MockClusterSingletonService service = new MockClusterSingletonService("member-1", "service-1"); + singletonNode1.registerClusterSingletonService(service); + + verifyServiceActive(service); + + final MockClusterSingletonService service2 = new MockClusterSingletonService("member-2", "service-1"); + singletonNode2.registerClusterSingletonService(service2); + + verifyServiceInactive(service2, 2); + } + + @Test + public void testSingletonOwnershipHandoff() { + final MockClusterSingletonService service = new MockClusterSingletonService("member-1", "service-1"); + final ClusterSingletonServiceRegistration registration = + singletonNode1.registerClusterSingletonService(service); + + verifyServiceActive(service); + + final MockClusterSingletonService service2 = new MockClusterSingletonService("member-2", "service-1"); + singletonNode2.registerClusterSingletonService(service2); + + verifyServiceInactive(service2, 2); + + registration.close(); + verifyServiceInactive(service); + verifyServiceActive(service2); + } + + @Test + public void testSingletonOwnershipHandoffOnNodeShutdown() throws Exception { + MockClusterSingletonService service2 = new MockClusterSingletonService("member-2", "service-1"); + ClusterSingletonServiceRegistration registration2 = + singletonNode2.registerClusterSingletonService(service2); + + verifyServiceActive(service2); + + final MockClusterSingletonService service3 = new MockClusterSingletonService("member-3", "service-1"); + final ClusterSingletonServiceRegistration registration3 = + singletonNode3.registerClusterSingletonService(service3); + + verifyServiceInactive(service3, 2); + + LOG.debug("Shutting down node2"); + TestKit.shutdownActorSystem(node2.getActorSystem()); + verifyServiceActive(service3); + + node2 = startupNativeService(2551, List.of("member-1"), THREE_NODE_SEED_NODES); + singletonNode2 = new MockSingletonService(node2); + singletonNode2.initializeProvider(); + + waitUntillNodeReady(node2); + service2 = new MockClusterSingletonService("member-2", "service-1"); + singletonNode2.registerClusterSingletonService(service2); + + verifyServiceActive(service3); + verifyServiceInactive(service2, 5); + } + + private void waitUntillNodeReady(MockNativeEntityOwnershipService node) { + // need to wait until all nodes are ready + final Cluster cluster = Cluster.get(Adapter.toTyped(node.getActorSystem())); + Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> { + final List members = new ArrayList<>(); + cluster.state().getMembers().forEach(members::add); + if (members.size() != 3) { + return false; + } + + for (final Member member : members) { + if (!member.status().equals(MemberStatus.up())) { + return false; + } + } + + return true; + }); + } + + private static void verifyServiceActive(MockClusterSingletonService service) { + await().untilAsserted(() -> assertTrue(service.isActivated())); + } + + private static void verifyServiceActive(MockClusterSingletonService service, long delay) { + await().pollDelay(delay, TimeUnit.SECONDS).untilAsserted(() -> assertTrue(service.isActivated())); + } + + private static void verifyServiceInactive(MockClusterSingletonService service) { + await().untilAsserted(() -> assertFalse(service.isActivated())); + } + + private static void verifyServiceInactive(MockClusterSingletonService service, long delay) { + await().pollDelay(delay, TimeUnit.SECONDS).untilAsserted(() -> assertFalse(service.isActivated())); + } + + private static class MockClusterSingletonService implements ClusterSingletonService { + + private final String member; + private final ServiceGroupIdentifier identifier; + private boolean activated = false; + + MockClusterSingletonService(String member, String identifier) { + this.member = member; + this.identifier = ServiceGroupIdentifier.create(identifier); + } + + @Override + public void instantiateServiceInstance() { + LOG.debug("{} : Activating service: {}", member, identifier); + activated = true; + } + + @Override + public ListenableFuture closeServiceInstance() { + LOG.debug("{} : Closing service: {}", member, identifier); + activated = false; + return Futures.immediateFuture(null); + } + + @Override + public ServiceGroupIdentifier getIdentifier() { + return identifier; + } + + public boolean isActivated() { + return activated; + } + } + + private static class MockSingletonService extends DOMClusterSingletonServiceProviderImpl { + MockSingletonService(DOMEntityOwnershipService entityOwnershipService) { + super(entityOwnershipService); + } + } +} -- 2.36.6