17391fac91265a7da322b385a4bd852c4e20b78e
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / registry / listener / type / EntityTypeListenerActor.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka.registry.listener.type;
9
10 import akka.actor.typed.ActorRef;
11 import akka.actor.typed.Behavior;
12 import akka.actor.typed.javadsl.AbstractBehavior;
13 import akka.actor.typed.javadsl.ActorContext;
14 import akka.actor.typed.javadsl.Behaviors;
15 import akka.actor.typed.javadsl.Receive;
16 import akka.cluster.ddata.ORMap;
17 import akka.cluster.ddata.ORSet;
18 import akka.cluster.ddata.typed.javadsl.DistributedData;
19 import akka.cluster.ddata.typed.javadsl.Replicator.Changed;
20 import akka.cluster.ddata.typed.javadsl.Replicator.SubscribeResponse;
21 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
22 import com.google.common.collect.ImmutableSet;
23 import com.google.common.collect.Sets;
24 import java.time.Duration;
25 import java.util.HashMap;
26 import java.util.Map;
27 import java.util.Map.Entry;
28 import java.util.Set;
29 import java.util.UUID;
30 import java.util.stream.Collectors;
31 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
32 import org.opendaylight.controller.eos.akka.registry.listener.owner.SingleEntityListenerActor;
33 import org.opendaylight.controller.eos.akka.registry.listener.owner.command.ListenerCommand;
34 import org.opendaylight.controller.eos.akka.registry.listener.type.command.CandidatesChanged;
35 import org.opendaylight.controller.eos.akka.registry.listener.type.command.EntityOwnerChanged;
36 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerCommand;
37 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
38 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 public class EntityTypeListenerActor extends AbstractBehavior<TypeListenerCommand> {
43     private static final Logger LOG = LoggerFactory.getLogger(EntityTypeListenerActor.class);
44
45     private final Map<DOMEntity, ActorRef<ListenerCommand>> activeListeners = new HashMap<>();
46     private final String localMember;
47     private final String entityType;
48     private final DOMEntityOwnershipListener listener;
49
50     public EntityTypeListenerActor(final ActorContext<TypeListenerCommand> context, final String localMember,
51                                    final String entityType, final DOMEntityOwnershipListener listener) {
52         super(context);
53         this.localMember = localMember;
54         this.entityType = entityType;
55         this.listener = listener;
56
57         new ReplicatorMessageAdapter<TypeListenerCommand, ORMap<DOMEntity, ORSet<String>>>(context,
58             DistributedData.get(context.getSystem()).replicator(), Duration.ofSeconds(5))
59                 .subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
60     }
61
62     public static Behavior<TypeListenerCommand> create(final String localMember, final String entityType,
63                                                        final DOMEntityOwnershipListener listener) {
64         return Behaviors.setup(ctx -> new EntityTypeListenerActor(ctx, localMember, entityType, listener));
65     }
66
67     @Override
68     public Receive<TypeListenerCommand> createReceive() {
69         return newReceiveBuilder()
70                 .onMessage(CandidatesChanged.class, this::onCandidatesChanged)
71                 .onMessage(EntityOwnerChanged.class, this::onOwnerChanged)
72                 .build();
73     }
74
75     private Behavior<TypeListenerCommand> onCandidatesChanged(final CandidatesChanged notification) {
76         final SubscribeResponse<ORMap<DOMEntity, ORSet<String>>> response = notification.getResponse();
77         if (response instanceof Changed) {
78             processCandidates(((Changed<ORMap<DOMEntity, ORSet<String>>>) response).get(response.key()).getEntries());
79         } else {
80             LOG.warn("Unexpected notification from replicator: {}", response);
81         }
82         return this;
83     }
84
85     private void processCandidates(final Map<DOMEntity, ORSet<String>> entries) {
86         final Map<DOMEntity, ORSet<String>> filteredCandidates = entries.entrySet().stream()
87             .filter(entry -> entry.getKey().getType().equals(entityType))
88             .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
89         LOG.debug("Entity-type: {} current candidates: {}", entityType, filteredCandidates);
90
91         final Set<DOMEntity> removed =
92                 ImmutableSet.copyOf(Sets.difference(activeListeners.keySet(), filteredCandidates.keySet()));
93         if (!removed.isEmpty()) {
94             LOG.debug("Stopping listeners for {}", removed);
95             // kill actors for the removed
96             removed.forEach(removedEntity -> getContext().stop(activeListeners.remove(removedEntity)));
97         }
98
99         for (final Entry<DOMEntity, ORSet<String>> entry : filteredCandidates.entrySet()) {
100             activeListeners.computeIfAbsent(entry.getKey(), key -> {
101                 // spawn actor for this entity
102                 LOG.debug("Starting listener for {}", key);
103                 return getContext().spawn(SingleEntityListenerActor.create(localMember, key, getContext().getSelf()),
104                     "SingleEntityListener-" + encodeEntityToActorName(key));
105             });
106         }
107     }
108
109     private Behavior<TypeListenerCommand> onOwnerChanged(final EntityOwnerChanged rsp) {
110         LOG.debug("{} : Entity-type: {} listener, owner change: {}", localMember, entityType, rsp);
111
112         listener.ownershipChanged(rsp.getOwnershipChange());
113         return this;
114     }
115
116     private static String encodeEntityToActorName(final DOMEntity entity) {
117         return "type=" + entity.getType() + ",entity="
118                 + entity.getIdentifier().getLastPathArgument().getNodeType().getLocalName() + "-" + UUID.randomUUID();
119     }
120 }