2 * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.eos.akka.registry.listener.type;
10 import akka.actor.typed.ActorRef;
11 import akka.actor.typed.Behavior;
12 import akka.actor.typed.javadsl.AbstractBehavior;
13 import akka.actor.typed.javadsl.ActorContext;
14 import akka.actor.typed.javadsl.Behaviors;
15 import akka.actor.typed.javadsl.Receive;
16 import akka.cluster.ddata.ORMap;
17 import akka.cluster.ddata.ORSet;
18 import akka.cluster.ddata.typed.javadsl.DistributedData;
19 import akka.cluster.ddata.typed.javadsl.Replicator.Changed;
20 import akka.cluster.ddata.typed.javadsl.Replicator.SubscribeResponse;
21 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
22 import com.google.common.collect.ImmutableSet;
23 import com.google.common.collect.Sets;
24 import java.time.Duration;
25 import java.util.HashMap;
27 import java.util.Map.Entry;
29 import java.util.UUID;
30 import java.util.stream.Collectors;
31 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
32 import org.opendaylight.controller.eos.akka.registry.listener.owner.SingleEntityListenerActor;
33 import org.opendaylight.controller.eos.akka.registry.listener.owner.command.ListenerCommand;
34 import org.opendaylight.controller.eos.akka.registry.listener.type.command.CandidatesChanged;
35 import org.opendaylight.controller.eos.akka.registry.listener.type.command.EntityOwnerChanged;
36 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerCommand;
37 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
38 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
42 public class EntityTypeListenerActor extends AbstractBehavior<TypeListenerCommand> {
43 private static final Logger LOG = LoggerFactory.getLogger(EntityTypeListenerActor.class);
45 private final Map<DOMEntity, ActorRef<ListenerCommand>> activeListeners = new HashMap<>();
46 private final String localMember;
47 private final String entityType;
48 private final DOMEntityOwnershipListener listener;
50 public EntityTypeListenerActor(final ActorContext<TypeListenerCommand> context, final String localMember,
51 final String entityType, final DOMEntityOwnershipListener listener) {
53 this.localMember = localMember;
54 this.entityType = entityType;
55 this.listener = listener;
57 new ReplicatorMessageAdapter<TypeListenerCommand, ORMap<DOMEntity, ORSet<String>>>(context,
58 DistributedData.get(context.getSystem()).replicator(), Duration.ofSeconds(5))
59 .subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
62 public static Behavior<TypeListenerCommand> create(final String localMember, final String entityType,
63 final DOMEntityOwnershipListener listener) {
64 return Behaviors.setup(ctx -> new EntityTypeListenerActor(ctx, localMember, entityType, listener));
68 public Receive<TypeListenerCommand> createReceive() {
69 return newReceiveBuilder()
70 .onMessage(CandidatesChanged.class, this::onCandidatesChanged)
71 .onMessage(EntityOwnerChanged.class, this::onOwnerChanged)
75 private Behavior<TypeListenerCommand> onCandidatesChanged(final CandidatesChanged notification) {
76 final SubscribeResponse<ORMap<DOMEntity, ORSet<String>>> response = notification.getResponse();
77 if (response instanceof Changed) {
78 processCandidates(((Changed<ORMap<DOMEntity, ORSet<String>>>) response).get(response.key()).getEntries());
80 LOG.warn("Unexpected notification from replicator: {}", response);
85 private void processCandidates(final Map<DOMEntity, ORSet<String>> entries) {
86 final Map<DOMEntity, ORSet<String>> filteredCandidates = entries.entrySet().stream()
87 .filter(entry -> entry.getKey().getType().equals(entityType))
88 .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
89 LOG.debug("Entity-type: {} current candidates: {}", entityType, filteredCandidates);
91 final Set<DOMEntity> removed =
92 ImmutableSet.copyOf(Sets.difference(activeListeners.keySet(), filteredCandidates.keySet()));
93 if (!removed.isEmpty()) {
94 LOG.debug("Stopping listeners for {}", removed);
95 // kill actors for the removed
96 removed.forEach(removedEntity -> getContext().stop(activeListeners.remove(removedEntity)));
99 for (final Entry<DOMEntity, ORSet<String>> entry : filteredCandidates.entrySet()) {
100 activeListeners.computeIfAbsent(entry.getKey(), key -> {
101 // spawn actor for this entity
102 LOG.debug("Starting listener for {}", key);
103 return getContext().spawn(SingleEntityListenerActor.create(localMember, key, getContext().getSelf()),
104 "SingleEntityListener-" + encodeEntityToActorName(key));
109 private Behavior<TypeListenerCommand> onOwnerChanged(final EntityOwnerChanged rsp) {
110 LOG.debug("{} : Entity-type: {} listener, owner change: {}", localMember, entityType, rsp);
112 listener.ownershipChanged(rsp.getOwnershipChange());
116 private static String encodeEntityToActorName(final DOMEntity entity) {
117 return "type=" + entity.getType() + ",entity="
118 + entity.getIdentifier().getLastPathArgument().getNodeType().getLocalName() + "-" + UUID.randomUUID();