X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Feos-dom-akka%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Feos%2Fakka%2FAbstractNativeEosTest.java;h=c4e97d60cbaea847e747921b4e61c9e4f9df1d03;hp=5af0e2afdbffffa48aea202e4b2827ae998db2f0;hb=HEAD;hpb=3a526427c93dc44700ca476e57f0cea6eadfb2a7 diff --git a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java index 5af0e2afdb..6adba42c09 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java +++ b/opendaylight/md-sal/eos-dom-akka/src/test/java/org/opendaylight/controller/eos/akka/AbstractNativeEosTest.java @@ -19,6 +19,8 @@ import akka.actor.typed.javadsl.AskPattern; import akka.actor.typed.javadsl.Behaviors; import akka.cluster.ddata.LWWRegister; import akka.cluster.ddata.LWWRegisterKey; +import akka.cluster.ddata.ORMap; +import akka.cluster.ddata.ORSet; import akka.cluster.ddata.typed.javadsl.DistributedData; import akka.cluster.ddata.typed.javadsl.Replicator; import com.typesafe.config.Config; @@ -44,13 +46,19 @@ import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReach import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent; import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand; import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply; +import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry; import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand; import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate; import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate; +import org.opendaylight.controller.eos.akka.registry.listener.type.command.EntityOwnerChanged; import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener; import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand; +import org.opendaylight.mdsal.binding.dom.codec.impl.BindingCodecContext; +import org.opendaylight.mdsal.binding.generator.impl.DefaultBindingRuntimeGenerator; +import org.opendaylight.mdsal.binding.runtime.api.BindingRuntimeGenerator; +import org.opendaylight.mdsal.binding.runtime.spi.BindingRuntimeHelpers; +import org.opendaylight.mdsal.eos.common.api.EntityOwnershipStateChange; import org.opendaylight.mdsal.eos.dom.api.DOMEntity; -import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange; import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,13 +85,18 @@ public abstract class AbstractNativeEosTest { "akka://ClusterSystem@127.0.0.1:2552", "akka://ClusterSystem@127.0.0.1:2553"); + private static final BindingRuntimeGenerator BINDING_RUNTIME_GENERATOR = new DefaultBindingRuntimeGenerator(); + + protected static BindingCodecContext CODEC_CONTEXT + = new BindingCodecContext(BindingRuntimeHelpers.createRuntimeContext()); + private static final String REMOTE_PROTOCOL = "akka"; private static final String PORT_PARAM = "akka.remote.artery.canonical.port"; private static final String ROLE_PARAM = "akka.cluster.roles"; private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes"; private static final String DATA_CENTER_PARAM = "akka.cluster.multi-data-center.self-data-center"; - protected static MockNativeEntityOwnershipService startupNativeService(final int port, List roles, + protected static MockNativeEntityOwnershipService startupNativeService(final int port, final List roles, final List seedNodes) throws ExecutionException, InterruptedException { final Map overrides = new HashMap<>(); @@ -154,34 +167,55 @@ public abstract class AbstractNativeEosTest { protected static ClusterNode startupWithDatacenter(final int port, final List roles, final List seedNodes, final String dataCenter) throws ExecutionException, InterruptedException { + final akka.actor.ActorSystem system = startupActorSystem(port, roles, seedNodes, dataCenter); + final ActorRef eosBootstrap = + Adapter.spawn(system, EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec()), "EOSBootstrap"); + + final CompletionStage ask = AskPattern.ask(eosBootstrap, + GetRunningContext::new, + Duration.ofSeconds(5), + Adapter.toTyped(system.scheduler())); + final RunningContext runningContext = ask.toCompletableFuture().get(); + + return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(), + runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor()); + } + + protected static akka.actor.ActorSystem startupActorSystem(final int port, final List roles, + final List seedNodes) { final Map overrides = new HashMap<>(); overrides.put(PORT_PARAM, port); overrides.put(ROLE_PARAM, roles); if (!seedNodes.isEmpty()) { overrides.put(SEED_NODES_PARAM, seedNodes); } - overrides.put(DATA_CENTER_PARAM, dataCenter); final Config config = ConfigFactory.parseMap(overrides) .withFallback(ConfigFactory.load()); // Create a classic Akka system since thats what we will have in osgi - final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config); - final ActorRef eosBootstrap = - Adapter.spawn(system, EOSMain.create(), "EOSBootstrap"); + return akka.actor.ActorSystem.create("ClusterSystem", config); + } - final CompletionStage ask = AskPattern.ask(eosBootstrap, - GetRunningContext::new, - Duration.ofSeconds(5), - Adapter.toTyped(system.scheduler())); - final RunningContext runningContext = ask.toCompletableFuture().get(); + protected static akka.actor.ActorSystem startupActorSystem(final int port, final List roles, + final List seedNodes, final String dataCenter) { + final Map overrides = new HashMap<>(); + overrides.put(PORT_PARAM, port); + overrides.put(ROLE_PARAM, roles); + if (!seedNodes.isEmpty()) { + overrides.put(SEED_NODES_PARAM, seedNodes); + } + overrides.put(DATA_CENTER_PARAM, dataCenter); - return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(), - runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor()); + final Config config = ConfigFactory.parseMap(overrides) + .withFallback(ConfigFactory.load()); + + // Create a classic Akka system since thats what we will have in osgi + return akka.actor.ActorSystem.create("ClusterSystem", config); } private static Behavior rootBehavior() { - return Behaviors.setup(context -> EOSMain.create()); + return Behaviors.setup(context -> EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec())); } protected static void registerCandidates(final ClusterNode node, final DOMEntity entity, final String... members) { @@ -233,7 +267,7 @@ public abstract class AbstractNativeEosTest { } protected static void waitUntillOwnerPresent(final ClusterNode clusterNode, final DOMEntity entity) { - await().until(() -> { + await().atMost(Duration.ofSeconds(15)).until(() -> { final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem()); final CompletionStage>> ask = AskPattern.ask(distributedData.replicator(), @@ -254,6 +288,32 @@ public abstract class AbstractNativeEosTest { }); } + protected static void waitUntillCandidatePresent(final ClusterNode clusterNode, final DOMEntity entity, + final String candidate) { + await().atMost(Duration.ofSeconds(15)).until(() -> { + final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem()); + + final CompletionStage>>> ask = + AskPattern.ask(distributedData.replicator(), + replyTo -> new Replicator.Get<>( + CandidateRegistry.KEY, Replicator.readLocal(), replyTo), + Duration.ofSeconds(5), + clusterNode.getActorSystem().scheduler()); + + final Replicator.GetResponse>> response = + ask.toCompletableFuture().get(5, TimeUnit.SECONDS); + + if (response instanceof Replicator.GetSuccess) { + final Map> entries = + ((Replicator.GetSuccess>>) response).dataValue().getEntries(); + + return entries.get(entity).contains(candidate); + + } + return false; + }); + } + protected static CompletableFuture activateDatacenter(final ClusterNode clusterNode) { final CompletionStage ask = AskPattern.ask(clusterNode.getOwnerSupervisor(), @@ -276,14 +336,14 @@ public abstract class AbstractNativeEosTest { final boolean hasOwner, final boolean isOwner, final boolean wasOwner) { await().until(() -> !listener.getChanges().isEmpty()); - await().untilAsserted(() -> { - final List changes = listener.getChanges(); - final DOMEntityOwnershipChange domEntityOwnershipChange = listener.getChanges().get(changes.size() - 1); - assertEquals(entity, domEntityOwnershipChange.getEntity()); + await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> { + final var changes = listener.getChanges(); + final var domEntityOwnershipChange = listener.getChanges().get(changes.size() - 1); + assertEquals(entity, domEntityOwnershipChange.entity()); - assertEquals(hasOwner, domEntityOwnershipChange.getState().hasOwner()); - assertEquals(isOwner, domEntityOwnershipChange.getState().isOwner()); - assertEquals(wasOwner, domEntityOwnershipChange.getState().wasOwner()); + assertEquals(hasOwner, domEntityOwnershipChange.change().hasOwner()); + assertEquals(isOwner, domEntityOwnershipChange.change().isOwner()); + assertEquals(wasOwner, domEntityOwnershipChange.change().wasOwner()); }); } @@ -291,12 +351,12 @@ public abstract class AbstractNativeEosTest { verifyNoNotifications(listener, 2); } - protected static void verifyNoNotifications(final MockEntityOwnershipListener listener, long delaySeconds) { + protected static void verifyNoNotifications(final MockEntityOwnershipListener listener, final long delaySeconds) { await().pollDelay(delaySeconds, TimeUnit.SECONDS).until(() -> listener.getChanges().isEmpty()); } protected static void verifyNoAdditionalNotifications( - final MockEntityOwnershipListener listener, long delaySeconds) { + final MockEntityOwnershipListener listener, final long delaySeconds) { listener.resetListener(); verifyNoNotifications(listener, delaySeconds); } @@ -356,11 +416,9 @@ public abstract class AbstractNativeEosTest { } protected static final class MockEntityOwnershipListener implements DOMEntityOwnershipListener { - - private final Logger log; - - private final List changes = new ArrayList<>(); + private final List changes = new ArrayList<>(); private final String member; + private final Logger log; public MockEntityOwnershipListener(final String member) { log = LoggerFactory.getLogger("EOS-listener-" + member); @@ -368,13 +426,15 @@ public abstract class AbstractNativeEosTest { } @Override - public void ownershipChanged(final DOMEntityOwnershipChange ownershipChange) { - log.info("{} Received ownershipCHanged: {}", member, ownershipChange); + public void ownershipChanged(final DOMEntity entity, final EntityOwnershipStateChange change, + final boolean inJeopardy) { + final var changed = new EntityOwnerChanged(entity, change, inJeopardy); + log.info("{} Received ownershipCHanged: {}", member, changed); log.info("{} changes: {}", member, changes.size()); - changes.add(ownershipChange); + changes.add(changed); } - public List getChanges() { + public List getChanges() { return changes; } @@ -384,11 +444,11 @@ public abstract class AbstractNativeEosTest { } protected static final class MockNativeEntityOwnershipService extends AkkaEntityOwnershipService { - private ActorSystem classicActorSystem; + private final ActorSystem classicActorSystem; - protected MockNativeEntityOwnershipService(ActorSystem classicActorSystem) + protected MockNativeEntityOwnershipService(final ActorSystem classicActorSystem) throws ExecutionException, InterruptedException { - super(classicActorSystem); + super(classicActorSystem, CODEC_CONTEXT); this.classicActorSystem = classicActorSystem; }