/* * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.eos.akka; import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.typed.ActorRef; import akka.actor.typed.Behavior; import akka.actor.typed.javadsl.Adapter; import akka.actor.typed.javadsl.AskPattern; import akka.actor.typed.javadsl.Behaviors; import akka.cluster.ddata.LWWRegister; import akka.cluster.ddata.LWWRegisterKey; import akka.cluster.ddata.ORMap; import akka.cluster.ddata.ORSet; import akka.cluster.ddata.typed.javadsl.DistributedData; import akka.cluster.ddata.typed.javadsl.Replicator; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.opendaylight.controller.eos.akka.bootstrap.EOSMain; import org.opendaylight.controller.eos.akka.bootstrap.command.BootstrapCommand; import org.opendaylight.controller.eos.akka.bootstrap.command.GetRunningContext; import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext; import org.opendaylight.controller.eos.akka.owner.supervisor.command.ActivateDataCenter; import org.opendaylight.controller.eos.akka.owner.supervisor.command.DeactivateDataCenter; import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent; import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent; import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand; import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply; import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry; import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand; import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate; import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate; import org.opendaylight.controller.eos.akka.registry.listener.type.command.EntityOwnerChanged; import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener; import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand; import org.opendaylight.mdsal.binding.dom.codec.impl.BindingCodecContext; import org.opendaylight.mdsal.binding.generator.impl.DefaultBindingRuntimeGenerator; import org.opendaylight.mdsal.binding.runtime.api.BindingRuntimeGenerator; import org.opendaylight.mdsal.binding.runtime.spi.BindingRuntimeHelpers; import org.opendaylight.mdsal.eos.common.api.EntityOwnershipStateChange; import org.opendaylight.mdsal.eos.dom.api.DOMEntity; import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public abstract class AbstractNativeEosTest { public static final DOMEntity ENTITY_1 = new DOMEntity("test-type", "entity-1"); public static final DOMEntity ENTITY_2 = new DOMEntity("test-type-2", "entity-2"); protected static final String DEFAULT_DATACENTER = "dc-default"; protected static final List TWO_NODE_SEED_NODES = List.of("akka://ClusterSystem@127.0.0.1:2550", "akka://ClusterSystem@127.0.0.1:2551"); protected static final List THREE_NODE_SEED_NODES = List.of("akka://ClusterSystem@127.0.0.1:2550", "akka://ClusterSystem@127.0.0.1:2551", "akka://ClusterSystem@127.0.0.1:2552"); protected static final List DATACENTER_SEED_NODES = List.of("akka://ClusterSystem@127.0.0.1:2550", "akka://ClusterSystem@127.0.0.1:2551", "akka://ClusterSystem@127.0.0.1:2552", "akka://ClusterSystem@127.0.0.1:2553"); private static final BindingRuntimeGenerator BINDING_RUNTIME_GENERATOR = new DefaultBindingRuntimeGenerator(); protected static BindingCodecContext CODEC_CONTEXT = new BindingCodecContext(BindingRuntimeHelpers.createRuntimeContext()); private static final String REMOTE_PROTOCOL = "akka"; private static final String PORT_PARAM = "akka.remote.artery.canonical.port"; private static final String ROLE_PARAM = "akka.cluster.roles"; private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes"; private static final String DATA_CENTER_PARAM = "akka.cluster.multi-data-center.self-data-center"; protected static MockNativeEntityOwnershipService startupNativeService(final int port, final List roles, final List seedNodes) throws ExecutionException, InterruptedException { final Map overrides = new HashMap<>(); overrides.put(PORT_PARAM, port); overrides.put(ROLE_PARAM, roles); if (!seedNodes.isEmpty()) { overrides.put(SEED_NODES_PARAM, seedNodes); } final Config config = ConfigFactory.parseMap(overrides) .withFallback(ConfigFactory.load()); // Create a classic Akka system since thats what we will have in osgi final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config); return new MockNativeEntityOwnershipService(system); } protected static ClusterNode startupRemote(final int port, final List roles) throws ExecutionException, InterruptedException { return startup(port, roles, THREE_NODE_SEED_NODES); } protected static ClusterNode startupRemote(final int port, final List roles, final List seedNodes) throws ExecutionException, InterruptedException { return startup(port, roles, seedNodes); } protected static ClusterNode startup(final int port, final List roles) throws ExecutionException, InterruptedException { return startup(port, roles, List.of()); } protected static ClusterNode startup(final int port, final List roles, final List seedNodes) throws ExecutionException, InterruptedException { return startup(port, roles, seedNodes, AbstractNativeEosTest::rootBehavior); } protected static ClusterNode startup(final int port, final List roles, final List seedNodes, final Supplier> bootstrap) throws ExecutionException, InterruptedException { // Override the configuration final Map overrides = new HashMap<>(4); overrides.put(PORT_PARAM, port); overrides.put(ROLE_PARAM, roles); if (!seedNodes.isEmpty()) { overrides.put(SEED_NODES_PARAM, seedNodes); } final Config config = ConfigFactory.parseMap(overrides).withFallback(ConfigFactory.load()); // Create a classic Akka system since thats what we will have in osgi final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config); final ActorRef eosBootstrap = Adapter.spawn(system, bootstrap.get(), "EOSBootstrap"); final CompletionStage ask = AskPattern.ask(eosBootstrap, GetRunningContext::new, Duration.ofSeconds(5), Adapter.toTyped(system.scheduler())); final RunningContext runningContext = ask.toCompletableFuture().get(); return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(), runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor()); } protected static ClusterNode startupWithDatacenter(final int port, final List roles, final List seedNodes, final String dataCenter) throws ExecutionException, InterruptedException { final akka.actor.ActorSystem system = startupActorSystem(port, roles, seedNodes, dataCenter); final ActorRef eosBootstrap = Adapter.spawn(system, EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec()), "EOSBootstrap"); final CompletionStage ask = AskPattern.ask(eosBootstrap, GetRunningContext::new, Duration.ofSeconds(5), Adapter.toTyped(system.scheduler())); final RunningContext runningContext = ask.toCompletableFuture().get(); return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(), runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor()); } protected static akka.actor.ActorSystem startupActorSystem(final int port, final List roles, final List seedNodes) { final Map overrides = new HashMap<>(); overrides.put(PORT_PARAM, port); overrides.put(ROLE_PARAM, roles); if (!seedNodes.isEmpty()) { overrides.put(SEED_NODES_PARAM, seedNodes); } final Config config = ConfigFactory.parseMap(overrides) .withFallback(ConfigFactory.load()); // Create a classic Akka system since thats what we will have in osgi return akka.actor.ActorSystem.create("ClusterSystem", config); } protected static akka.actor.ActorSystem startupActorSystem(final int port, final List roles, final List seedNodes, final String dataCenter) { final Map overrides = new HashMap<>(); overrides.put(PORT_PARAM, port); overrides.put(ROLE_PARAM, roles); if (!seedNodes.isEmpty()) { overrides.put(SEED_NODES_PARAM, seedNodes); } overrides.put(DATA_CENTER_PARAM, dataCenter); final Config config = ConfigFactory.parseMap(overrides) .withFallback(ConfigFactory.load()); // Create a classic Akka system since thats what we will have in osgi return akka.actor.ActorSystem.create("ClusterSystem", config); } private static Behavior rootBehavior() { return Behaviors.setup(context -> EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec())); } protected static void registerCandidates(final ClusterNode node, final DOMEntity entity, final String... members) { final ActorRef candidateRegistry = node.getCandidateRegistry(); registerCandidates(candidateRegistry, entity, members); } protected static void registerCandidates(final ActorRef candidateRegistry, final DOMEntity entity, final String... members) { for (final String member : members) { candidateRegistry.tell(new RegisterCandidate(entity, member)); } } protected static void unregisterCandidates(final ClusterNode node, final DOMEntity entity, final String... members) { final ActorRef candidateRegistry = node.getCandidateRegistry(); for (final String member : members) { candidateRegistry.tell(new UnregisterCandidate(entity, member)); } } protected static MockEntityOwnershipListener registerListener(final ClusterNode node, final DOMEntity entity) { final ActorRef listenerRegistry = node.getListenerRegistry(); final MockEntityOwnershipListener listener = new MockEntityOwnershipListener(node.getRoles().get(0)); listenerRegistry.tell(new RegisterListener(entity.getType(), listener)); return listener; } protected static void reachableMember(final ClusterNode node, final String... role) { reachableMember(node.getOwnerSupervisor(), role); } protected static void reachableMember(final ActorRef ownerSupervisor, final String... role) { ownerSupervisor.tell(new MemberReachableEvent( new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role))); } protected static void unreachableMember(final ClusterNode node, final String... role) { unreachableMember(node.getOwnerSupervisor(), role); } protected static void unreachableMember(final ActorRef ownerSupervisor, final String... role) { ownerSupervisor.tell(new MemberUnreachableEvent( new Address(REMOTE_PROTOCOL, "ClusterSystem@127.0.0.1:2550"), Set.of(role))); } protected static void waitUntillOwnerPresent(final ClusterNode clusterNode, final DOMEntity entity) { await().atMost(Duration.ofSeconds(15)).until(() -> { final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem()); final CompletionStage>> ask = AskPattern.ask(distributedData.replicator(), replyTo -> new Replicator.Get<>( new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), replyTo), Duration.ofSeconds(5), clusterNode.getActorSystem().scheduler()); final Replicator.GetResponse> response = ask.toCompletableFuture().get(5, TimeUnit.SECONDS); if (response instanceof Replicator.GetSuccess) { final String owner = ((Replicator.GetSuccess>) response).dataValue().getValue(); return !owner.isEmpty(); } return false; }); } protected static void waitUntillCandidatePresent(final ClusterNode clusterNode, final DOMEntity entity, final String candidate) { await().atMost(Duration.ofSeconds(15)).until(() -> { final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem()); final CompletionStage>>> ask = AskPattern.ask(distributedData.replicator(), replyTo -> new Replicator.Get<>( CandidateRegistry.KEY, Replicator.readLocal(), replyTo), Duration.ofSeconds(5), clusterNode.getActorSystem().scheduler()); final Replicator.GetResponse>> response = ask.toCompletableFuture().get(5, TimeUnit.SECONDS); if (response instanceof Replicator.GetSuccess) { final Map> entries = ((Replicator.GetSuccess>>) response).dataValue().getEntries(); return entries.get(entity).contains(candidate); } return false; }); } protected static CompletableFuture activateDatacenter(final ClusterNode clusterNode) { final CompletionStage ask = AskPattern.ask(clusterNode.getOwnerSupervisor(), ActivateDataCenter::new, Duration.ofSeconds(20), clusterNode.actorSystem.scheduler()); return ask.toCompletableFuture(); } protected static CompletableFuture deactivateDatacenter(final ClusterNode clusterNode) { final CompletionStage ask = AskPattern.ask(clusterNode.getOwnerSupervisor(), DeactivateDataCenter::new, Duration.ofSeconds(20), clusterNode.actorSystem.scheduler()); return ask.toCompletableFuture(); } protected static void verifyListenerState(final MockEntityOwnershipListener listener, final DOMEntity entity, final boolean hasOwner, final boolean isOwner, final boolean wasOwner) { await().until(() -> !listener.getChanges().isEmpty()); await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> { final var changes = listener.getChanges(); final var domEntityOwnershipChange = listener.getChanges().get(changes.size() - 1); assertEquals(entity, domEntityOwnershipChange.entity()); assertEquals(hasOwner, domEntityOwnershipChange.change().hasOwner()); assertEquals(isOwner, domEntityOwnershipChange.change().isOwner()); assertEquals(wasOwner, domEntityOwnershipChange.change().wasOwner()); }); } protected static void verifyNoNotifications(final MockEntityOwnershipListener listener) { verifyNoNotifications(listener, 2); } protected static void verifyNoNotifications(final MockEntityOwnershipListener listener, final long delaySeconds) { await().pollDelay(delaySeconds, TimeUnit.SECONDS).until(() -> listener.getChanges().isEmpty()); } protected static void verifyNoAdditionalNotifications( final MockEntityOwnershipListener listener, final long delaySeconds) { listener.resetListener(); verifyNoNotifications(listener, delaySeconds); } protected static final class ClusterNode { private final int port; private final List roles; private final akka.actor.typed.ActorSystem actorSystem; private final ActorRef eosBootstrap; private final ActorRef listenerRegistry; private final ActorRef candidateRegistry; private final ActorRef ownerSupervisor; private ClusterNode(final int port, final List roles, final ActorSystem actorSystem, final ActorRef eosBootstrap, final ActorRef listenerRegistry, final ActorRef candidateRegistry, final ActorRef ownerSupervisor) { this.port = port; this.roles = roles; this.actorSystem = Adapter.toTyped(actorSystem); this.eosBootstrap = eosBootstrap; this.listenerRegistry = listenerRegistry; this.candidateRegistry = candidateRegistry; this.ownerSupervisor = ownerSupervisor; } public int getPort() { return port; } public akka.actor.typed.ActorSystem getActorSystem() { return actorSystem; } public ActorRef getEosBootstrap() { return eosBootstrap; } public ActorRef getListenerRegistry() { return listenerRegistry; } public ActorRef getCandidateRegistry() { return candidateRegistry; } public ActorRef getOwnerSupervisor() { return ownerSupervisor; } public List getRoles() { return roles; } } protected static final class MockEntityOwnershipListener implements DOMEntityOwnershipListener { private final List changes = new ArrayList<>(); private final String member; private final Logger log; public MockEntityOwnershipListener(final String member) { log = LoggerFactory.getLogger("EOS-listener-" + member); this.member = member; } @Override public void ownershipChanged(final DOMEntity entity, final EntityOwnershipStateChange change, final boolean inJeopardy) { final var changed = new EntityOwnerChanged(entity, change, inJeopardy); log.info("{} Received ownershipCHanged: {}", member, changed); log.info("{} changes: {}", member, changes.size()); changes.add(changed); } public List getChanges() { return changes; } public void resetListener() { changes.clear(); } } protected static final class MockNativeEntityOwnershipService extends AkkaEntityOwnershipService { private final ActorSystem classicActorSystem; protected MockNativeEntityOwnershipService(final ActorSystem classicActorSystem) throws ExecutionException, InterruptedException { super(classicActorSystem, CODEC_CONTEXT); this.classicActorSystem = classicActorSystem; } protected void reachableMember(final String... role) { AbstractNativeEosTest.reachableMember(ownerSupervisor, role); } public void unreachableMember(final String... role) { AbstractNativeEosTest.unreachableMember(ownerSupervisor, role); } public ActorSystem getActorSystem() { return classicActorSystem; } } }