2 * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.eos.akka.registry.listener.type;
10 import akka.actor.typed.ActorRef;
11 import akka.actor.typed.Behavior;
12 import akka.actor.typed.javadsl.AbstractBehavior;
13 import akka.actor.typed.javadsl.ActorContext;
14 import akka.actor.typed.javadsl.Behaviors;
15 import akka.actor.typed.javadsl.Receive;
16 import akka.cluster.ddata.ORMap;
17 import akka.cluster.ddata.ORSet;
18 import akka.cluster.ddata.typed.javadsl.DistributedData;
19 import akka.cluster.ddata.typed.javadsl.Replicator.Changed;
20 import akka.cluster.ddata.typed.javadsl.Replicator.SubscribeResponse;
21 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
22 import com.google.common.collect.ImmutableSet;
23 import com.google.common.collect.Sets;
24 import java.time.Duration;
25 import java.util.HashMap;
27 import java.util.Map.Entry;
29 import java.util.UUID;
30 import java.util.stream.Collectors;
31 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
32 import org.opendaylight.controller.eos.akka.registry.listener.owner.SingleEntityListenerActor;
33 import org.opendaylight.controller.eos.akka.registry.listener.owner.command.ListenerCommand;
34 import org.opendaylight.controller.eos.akka.registry.listener.type.command.CandidatesChanged;
35 import org.opendaylight.controller.eos.akka.registry.listener.type.command.EntityOwnerChanged;
36 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TerminateListener;
37 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerCommand;
38 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
39 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
43 public class EntityTypeListenerActor extends AbstractBehavior<TypeListenerCommand> {
44 private static final Logger LOG = LoggerFactory.getLogger(EntityTypeListenerActor.class);
46 private final Map<DOMEntity, ActorRef<ListenerCommand>> activeListeners = new HashMap<>();
47 private final String localMember;
48 private final String entityType;
49 private final DOMEntityOwnershipListener listener;
51 public EntityTypeListenerActor(final ActorContext<TypeListenerCommand> context, final String localMember,
52 final String entityType, final DOMEntityOwnershipListener listener) {
54 this.localMember = localMember;
55 this.entityType = entityType;
56 this.listener = listener;
58 new ReplicatorMessageAdapter<TypeListenerCommand, ORMap<DOMEntity, ORSet<String>>>(context,
59 DistributedData.get(context.getSystem()).replicator(), Duration.ofSeconds(5))
60 .subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
63 public static Behavior<TypeListenerCommand> create(final String localMember, final String entityType,
64 final DOMEntityOwnershipListener listener) {
65 return Behaviors.setup(ctx -> new EntityTypeListenerActor(ctx, localMember, entityType, listener));
69 public Receive<TypeListenerCommand> createReceive() {
70 return newReceiveBuilder()
71 .onMessage(CandidatesChanged.class, this::onCandidatesChanged)
72 .onMessage(EntityOwnerChanged.class, this::onOwnerChanged)
73 .onMessage(TerminateListener.class, this::onTerminate)
77 private Behavior<TypeListenerCommand> onCandidatesChanged(final CandidatesChanged notification) {
78 final SubscribeResponse<ORMap<DOMEntity, ORSet<String>>> response = notification.getResponse();
79 if (response instanceof Changed) {
80 processCandidates(((Changed<ORMap<DOMEntity, ORSet<String>>>) response).get(response.key()).getEntries());
82 LOG.warn("Unexpected notification from replicator: {}", response);
87 private void processCandidates(final Map<DOMEntity, ORSet<String>> entries) {
88 final Map<DOMEntity, ORSet<String>> filteredCandidates = entries.entrySet().stream()
89 .filter(entry -> entry.getKey().getType().equals(entityType))
90 .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
91 LOG.debug("Entity-type: {} current candidates: {}", entityType, filteredCandidates);
93 final Set<DOMEntity> removed =
94 ImmutableSet.copyOf(Sets.difference(activeListeners.keySet(), filteredCandidates.keySet()));
95 if (!removed.isEmpty()) {
96 LOG.debug("Stopping listeners for {}", removed);
97 // kill actors for the removed
98 removed.forEach(removedEntity -> getContext().stop(activeListeners.remove(removedEntity)));
101 for (final Entry<DOMEntity, ORSet<String>> entry : filteredCandidates.entrySet()) {
102 activeListeners.computeIfAbsent(entry.getKey(), key -> {
103 // spawn actor for this entity
104 LOG.debug("Starting listener for {}", key);
105 return getContext().spawn(SingleEntityListenerActor.create(localMember, key, getContext().getSelf()),
106 "SingleEntityListener-" + encodeEntityToActorName(key));
111 private Behavior<TypeListenerCommand> onOwnerChanged(final EntityOwnerChanged rsp) {
112 LOG.debug("{} : Entity-type: {} listener, owner change: {}", localMember, entityType, rsp);
113 listener.ownershipChanged(rsp.entity(), rsp.change(), false);
117 private Behavior<TypeListenerCommand> onTerminate(final TerminateListener command) {
118 LOG.debug("Terminating listener for type: {}, listener: {}", entityType, listener);
119 return Behaviors.stopped();
122 private static String encodeEntityToActorName(final DOMEntity entity) {
123 return "type=" + entity.getType() + ",entity="
124 + entity.getIdentifier().getLastPathArgument().getNodeType().getLocalName() + "-" + UUID.randomUUID();