2 * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.eos.akka;
10 import static org.awaitility.Awaitility.await;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.junit.Assert.fail;
17 import akka.actor.ActorSystem;
18 import akka.actor.testkit.typed.javadsl.ActorTestKit;
19 import akka.actor.typed.ActorRef;
20 import akka.actor.typed.javadsl.Adapter;
21 import akka.actor.typed.javadsl.AskPattern;
22 import akka.cluster.ddata.ORMap;
23 import akka.cluster.ddata.ORSet;
24 import akka.cluster.ddata.typed.javadsl.DistributedData;
25 import akka.cluster.ddata.typed.javadsl.Replicator;
26 import com.typesafe.config.ConfigFactory;
27 import java.time.Duration;
29 import java.util.Optional;
30 import java.util.concurrent.CompletionStage;
31 import java.util.concurrent.ExecutionException;
32 import org.awaitility.Durations;
33 import org.junit.After;
34 import org.junit.Before;
35 import org.junit.Test;
36 import org.opendaylight.controller.eos.akka.bootstrap.command.RunningContext;
37 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
38 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
39 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
40 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
41 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
42 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipCandidateRegistration;
43 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListenerRegistration;
44 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
45 import org.opendaylight.yangtools.yang.common.QName;
46 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
48 public class AkkaEntityOwnershipServiceTest extends AbstractNativeEosTest {
49 static final String ENTITY_TYPE = "test";
50 static final String ENTITY_TYPE2 = "test2";
51 static final QName QNAME = QName.create("test", "2015-08-11", "foo");
52 static int ID_COUNTER = 1;
54 private ActorSystem system;
55 private akka.actor.typed.ActorSystem<Void> typedSystem;
56 private AkkaEntityOwnershipService service;
57 private ActorRef<Replicator.Command> replicator;
60 public void setUp() throws Exception {
61 system = ActorSystem.create("ClusterSystem", ConfigFactory.load());
62 typedSystem = Adapter.toTyped(this.system);
63 replicator = DistributedData.get(typedSystem).replicator();
65 service = new AkkaEntityOwnershipService(system);
69 public void tearDown() throws InterruptedException, ExecutionException {
71 ActorTestKit.shutdown(Adapter.toTyped(system), Duration.ofSeconds(20));
75 public void testRegisterCandidate() throws Exception {
76 final YangInstanceIdentifier entityId = YangInstanceIdentifier.of(QNAME);
77 final DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
79 final DOMEntityOwnershipCandidateRegistration reg = service.registerCandidate(entity);
81 verifyEntityOwnershipCandidateRegistration(entity, reg);
82 verifyEntityCandidateRegistered(ENTITY_TYPE, entityId, "member-1");
85 service.registerCandidate(entity);
86 fail("Expected CandidateAlreadyRegisteredException");
87 } catch (final CandidateAlreadyRegisteredException e) {
89 assertEquals("getEntity", entity, e.getEntity());
92 final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE2, entityId);
93 final DOMEntityOwnershipCandidateRegistration reg2 = service.registerCandidate(entity2);
95 verifyEntityOwnershipCandidateRegistration(entity2, reg2);
96 verifyEntityCandidateRegistered(ENTITY_TYPE2, entityId, "member-1");
100 public void testUnregisterCandidate() throws Exception {
101 final YangInstanceIdentifier entityId = YangInstanceIdentifier.of(QNAME);
102 final DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
104 final DOMEntityOwnershipCandidateRegistration reg = service.registerCandidate(entity);
106 verifyEntityOwnershipCandidateRegistration(entity, reg);
107 verifyEntityCandidateRegistered(ENTITY_TYPE, entityId, "member-1");
110 verifyEntityCandidateMissing(ENTITY_TYPE, entityId, "member-1");
112 service.registerCandidate(entity);
113 verifyEntityCandidateRegistered(ENTITY_TYPE, entityId, "member-1");
117 public void testListenerRegistration() throws Exception {
119 final YangInstanceIdentifier entityId = YangInstanceIdentifier.of(QNAME);
120 final DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
121 final MockEntityOwnershipListener listener = new MockEntityOwnershipListener("member-1");
123 final DOMEntityOwnershipListenerRegistration reg = service.registerListener(entity.getType(), listener);
125 assertNotNull("EntityOwnershipListenerRegistration null", reg);
126 assertEquals("getEntityType", entity.getType(), reg.getEntityType());
127 assertEquals("getInstance", listener, reg.getInstance());
129 final DOMEntityOwnershipCandidateRegistration candidate = service.registerCandidate(entity);
131 verifyListenerState(listener, entity, true, true, false);
132 final int changes = listener.getChanges().size();
137 verifyEntityCandidateMissing(ENTITY_TYPE, entityId, "member-1");
139 service.registerCandidate(entity);
140 // check listener not called when listener registration closed
141 await().pollDelay(Durations.TWO_SECONDS).until(() -> listener.getChanges().size() == changes);
145 public void testGetOwnershipState() throws Exception {
146 final DOMEntity entity = new DOMEntity(ENTITY_TYPE, "one");
148 final DOMEntityOwnershipCandidateRegistration registration = service.registerCandidate(entity);
149 verifyGetOwnershipState(service, entity, EntityOwnershipState.IS_OWNER);
151 final RunningContext runningContext = service.getRunningContext();
152 registerCandidates(runningContext.getCandidateRegistry(), entity, "member-2");
154 final ActorRef<OwnerSupervisorCommand> ownerSupervisor = runningContext.getOwnerSupervisor();
155 reachableMember(ownerSupervisor, "member-2", DEFAULT_DATACENTER);
156 unreachableMember(ownerSupervisor, "member-1", DEFAULT_DATACENTER);
157 verifyGetOwnershipState(service, entity, EntityOwnershipState.OWNED_BY_OTHER);
159 final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, "two");
160 final Optional<EntityOwnershipState> state = service.getOwnershipState(entity2);
161 assertFalse(state.isPresent());
163 unreachableMember(ownerSupervisor, "member-2", DEFAULT_DATACENTER);
164 verifyGetOwnershipState(service, entity, EntityOwnershipState.NO_OWNER);
168 public void testIsCandidateRegistered() throws Exception {
169 final DOMEntity test = new DOMEntity("test-type", "test");
171 assertFalse(service.isCandidateRegistered(test));
173 service.registerCandidate(test);
175 assertTrue(service.isCandidateRegistered(test));
178 private static void verifyGetOwnershipState(final DOMEntityOwnershipService service, final DOMEntity entity,
179 final EntityOwnershipState expState) {
180 await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> {
181 final Optional<EntityOwnershipState> state = service.getOwnershipState(entity);
182 assertTrue("getOwnershipState present", state.isPresent());
183 assertEquals("EntityOwnershipState", expState, state.get());
187 private void verifyEntityCandidateRegistered(final String entityType,
188 final YangInstanceIdentifier entityId,
189 final String candidateName) {
190 await().atMost(Duration.ofSeconds(5))
191 .untilAsserted(() -> doVerifyEntityCandidateRegistered(entityType, entityId, candidateName));
194 private void doVerifyEntityCandidateRegistered(final String entityType,
195 final YangInstanceIdentifier entityId,
196 final String candidateName)
197 throws ExecutionException, InterruptedException {
198 final Map<DOMEntity, ORSet<String>> entries = getCandidateData();
199 final DOMEntity entity = new DOMEntity(entityType, entityId);
200 assertTrue(entries.containsKey(entity));
201 assertTrue(entries.get(entity).getElements().contains(candidateName));
204 private void verifyEntityCandidateMissing(final String entityType,
205 final YangInstanceIdentifier entityId,
206 final String candidateName) {
207 await().atMost(Duration.ofSeconds(5))
208 .untilAsserted(() -> doVerifyEntityCandidateMissing(entityType, entityId, candidateName));
211 private void doVerifyEntityCandidateMissing(final String entityType,
212 final YangInstanceIdentifier entityId,
213 final String candidateName)
214 throws ExecutionException, InterruptedException {
215 final Map<DOMEntity, ORSet<String>> entries = getCandidateData();
216 final DOMEntity entity = new DOMEntity(entityType, entityId);
217 assertTrue(entries.containsKey(entity));
218 assertFalse(entries.get(entity).getElements().contains(candidateName));
221 private Map<DOMEntity, ORSet<String>> getCandidateData() throws ExecutionException, InterruptedException {
222 final CompletionStage<Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>>> ask =
223 AskPattern.ask(replicator, replyTo ->
224 new Replicator.Get<>(
225 CandidateRegistry.KEY,
226 Replicator.readLocal(),
228 Duration.ofSeconds(5),
229 typedSystem.scheduler());
231 final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response = ask.toCompletableFuture().get();
232 assertTrue(response instanceof Replicator.GetSuccess);
234 final Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>> success =
235 (Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response;
237 return success.get(CandidateRegistry.KEY).getEntries();
240 private static void verifyEntityOwnershipCandidateRegistration(final DOMEntity entity,
241 final DOMEntityOwnershipCandidateRegistration reg) {
242 assertNotNull("EntityOwnershipCandidateRegistration null", reg);
243 assertEquals("getInstance", entity, reg.getInstance());