/* * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.eos.akka; import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import akka.actor.ActorSystem; import akka.actor.testkit.typed.javadsl.ActorTestKit; import akka.actor.typed.ActorRef; import akka.actor.typed.javadsl.Adapter; import akka.actor.typed.javadsl.AskPattern; import akka.cluster.ddata.ORMap; import akka.cluster.ddata.ORSet; import akka.cluster.ddata.typed.javadsl.DistributedData; import akka.cluster.ddata.typed.javadsl.Replicator; import com.typesafe.config.ConfigFactory; import java.time.Duration; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import org.awaitility.Durations; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext; import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand; import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry; import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException; import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState; import org.opendaylight.mdsal.eos.dom.api.DOMEntity; import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipCandidateRegistration; import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListenerRegistration; import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; public class AkkaEntityOwnershipServiceTest extends AbstractNativeEosTest { static final String ENTITY_TYPE = "test"; static final String ENTITY_TYPE2 = "test2"; static final QName QNAME = QName.create("test", "2015-08-11", "foo"); static int ID_COUNTER = 1; private ActorSystem system; private akka.actor.typed.ActorSystem typedSystem; private AkkaEntityOwnershipService service; private ActorRef replicator; @Before public void setUp() throws Exception { system = ActorSystem.create("ClusterSystem", ConfigFactory.load()); typedSystem = Adapter.toTyped(this.system); replicator = DistributedData.get(typedSystem).replicator(); service = new AkkaEntityOwnershipService(system); } @After public void tearDown() throws InterruptedException, ExecutionException { service.close(); ActorTestKit.shutdown(Adapter.toTyped(system), Duration.ofSeconds(20)); } @Test public void testRegisterCandidate() throws Exception { final YangInstanceIdentifier entityId = YangInstanceIdentifier.of(QNAME); final DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId); final DOMEntityOwnershipCandidateRegistration reg = service.registerCandidate(entity); verifyEntityOwnershipCandidateRegistration(entity, reg); verifyEntityCandidateRegistered(ENTITY_TYPE, entityId, "member-1"); try { service.registerCandidate(entity); fail("Expected CandidateAlreadyRegisteredException"); } catch (final CandidateAlreadyRegisteredException e) { // expected assertEquals("getEntity", entity, e.getEntity()); } final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE2, entityId); final DOMEntityOwnershipCandidateRegistration reg2 = service.registerCandidate(entity2); verifyEntityOwnershipCandidateRegistration(entity2, reg2); verifyEntityCandidateRegistered(ENTITY_TYPE2, entityId, "member-1"); } @Test public void testUnregisterCandidate() throws Exception { final YangInstanceIdentifier entityId = YangInstanceIdentifier.of(QNAME); final DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId); final DOMEntityOwnershipCandidateRegistration reg = service.registerCandidate(entity); verifyEntityOwnershipCandidateRegistration(entity, reg); verifyEntityCandidateRegistered(ENTITY_TYPE, entityId, "member-1"); reg.close(); verifyEntityCandidateMissing(ENTITY_TYPE, entityId, "member-1"); service.registerCandidate(entity); verifyEntityCandidateRegistered(ENTITY_TYPE, entityId, "member-1"); } @Test public void testListenerRegistration() throws Exception { final YangInstanceIdentifier entityId = YangInstanceIdentifier.of(QNAME); final DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId); final MockEntityOwnershipListener listener = new MockEntityOwnershipListener("member-1"); final DOMEntityOwnershipListenerRegistration reg = service.registerListener(entity.getType(), listener); assertNotNull("EntityOwnershipListenerRegistration null", reg); assertEquals("getEntityType", entity.getType(), reg.getEntityType()); assertEquals("getInstance", listener, reg.getInstance()); final DOMEntityOwnershipCandidateRegistration candidate = service.registerCandidate(entity); verifyListenerState(listener, entity, true, true, false); final int changes = listener.getChanges().size(); reg.close(); candidate.close(); verifyEntityCandidateMissing(ENTITY_TYPE, entityId, "member-1"); service.registerCandidate(entity); // check listener not called when listener registration closed await().pollDelay(Durations.TWO_SECONDS).until(() -> listener.getChanges().size() == changes); } @Test public void testGetOwnershipState() throws Exception { final DOMEntity entity = new DOMEntity(ENTITY_TYPE, "one"); final DOMEntityOwnershipCandidateRegistration registration = service.registerCandidate(entity); verifyGetOwnershipState(service, entity, EntityOwnershipState.IS_OWNER); final RunningContext runningContext = service.getRunningContext(); registerCandidates(runningContext.getCandidateRegistry(), entity, "member-2"); final ActorRef ownerSupervisor = runningContext.getOwnerSupervisor(); reachableMember(ownerSupervisor, "member-2", DEFAULT_DATACENTER); unreachableMember(ownerSupervisor, "member-1", DEFAULT_DATACENTER); verifyGetOwnershipState(service, entity, EntityOwnershipState.OWNED_BY_OTHER); final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, "two"); final Optional state = service.getOwnershipState(entity2); assertFalse(state.isPresent()); unreachableMember(ownerSupervisor, "member-2", DEFAULT_DATACENTER); verifyGetOwnershipState(service, entity, EntityOwnershipState.NO_OWNER); } @Test public void testIsCandidateRegistered() throws Exception { final DOMEntity test = new DOMEntity("test-type", "test"); assertFalse(service.isCandidateRegistered(test)); service.registerCandidate(test); assertTrue(service.isCandidateRegistered(test)); } private static void verifyGetOwnershipState(final DOMEntityOwnershipService service, final DOMEntity entity, final EntityOwnershipState expState) { await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { final Optional state = service.getOwnershipState(entity); assertTrue("getOwnershipState present", state.isPresent()); assertEquals("EntityOwnershipState", expState, state.get()); }); } private void verifyEntityCandidateRegistered(final String entityType, final YangInstanceIdentifier entityId, final String candidateName) { await().atMost(Duration.ofSeconds(5)) .untilAsserted(() -> doVerifyEntityCandidateRegistered(entityType, entityId, candidateName)); } private void doVerifyEntityCandidateRegistered(final String entityType, final YangInstanceIdentifier entityId, final String candidateName) throws ExecutionException, InterruptedException { final Map> entries = getCandidateData(); final DOMEntity entity = new DOMEntity(entityType, entityId); assertTrue(entries.containsKey(entity)); assertTrue(entries.get(entity).getElements().contains(candidateName)); } private void verifyEntityCandidateMissing(final String entityType, final YangInstanceIdentifier entityId, final String candidateName) { await().atMost(Duration.ofSeconds(5)) .untilAsserted(() -> doVerifyEntityCandidateMissing(entityType, entityId, candidateName)); } private void doVerifyEntityCandidateMissing(final String entityType, final YangInstanceIdentifier entityId, final String candidateName) throws ExecutionException, InterruptedException { final Map> entries = getCandidateData(); final DOMEntity entity = new DOMEntity(entityType, entityId); assertTrue(entries.containsKey(entity)); assertFalse(entries.get(entity).getElements().contains(candidateName)); } private Map> getCandidateData() throws ExecutionException, InterruptedException { final CompletionStage>>> ask = AskPattern.ask(replicator, replyTo -> new Replicator.Get<>( CandidateRegistry.KEY, Replicator.readLocal(), replyTo), Duration.ofSeconds(5), typedSystem.scheduler()); final Replicator.GetResponse>> response = ask.toCompletableFuture().get(); assertTrue(response instanceof Replicator.GetSuccess); final Replicator.GetSuccess>> success = (Replicator.GetSuccess>>) response; return success.get(CandidateRegistry.KEY).getEntries(); } private static void verifyEntityOwnershipCandidateRegistration(final DOMEntity entity, final DOMEntityOwnershipCandidateRegistration reg) { assertNotNull("EntityOwnershipCandidateRegistration null", reg); assertEquals("getInstance", entity, reg.getInstance()); } }