Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / registry / listener / type / EntityTypeListenerActor.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka.registry.listener.type;
9
10 import akka.actor.typed.ActorRef;
11 import akka.actor.typed.Behavior;
12 import akka.actor.typed.javadsl.AbstractBehavior;
13 import akka.actor.typed.javadsl.ActorContext;
14 import akka.actor.typed.javadsl.Behaviors;
15 import akka.actor.typed.javadsl.Receive;
16 import akka.cluster.ddata.ORMap;
17 import akka.cluster.ddata.ORSet;
18 import akka.cluster.ddata.typed.javadsl.DistributedData;
19 import akka.cluster.ddata.typed.javadsl.Replicator.Changed;
20 import akka.cluster.ddata.typed.javadsl.Replicator.SubscribeResponse;
21 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
22 import com.google.common.collect.ImmutableSet;
23 import com.google.common.collect.Sets;
24 import java.time.Duration;
25 import java.util.HashMap;
26 import java.util.Map;
27 import java.util.Map.Entry;
28 import java.util.Set;
29 import java.util.UUID;
30 import java.util.stream.Collectors;
31 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
32 import org.opendaylight.controller.eos.akka.registry.listener.owner.SingleEntityListenerActor;
33 import org.opendaylight.controller.eos.akka.registry.listener.owner.command.ListenerCommand;
34 import org.opendaylight.controller.eos.akka.registry.listener.type.command.CandidatesChanged;
35 import org.opendaylight.controller.eos.akka.registry.listener.type.command.EntityOwnerChanged;
36 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TerminateListener;
37 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerCommand;
38 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
39 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 public class EntityTypeListenerActor extends AbstractBehavior<TypeListenerCommand> {
44     private static final Logger LOG = LoggerFactory.getLogger(EntityTypeListenerActor.class);
45
46     private final Map<DOMEntity, ActorRef<ListenerCommand>> activeListeners = new HashMap<>();
47     private final String localMember;
48     private final String entityType;
49     private final DOMEntityOwnershipListener listener;
50
51     public EntityTypeListenerActor(final ActorContext<TypeListenerCommand> context, final String localMember,
52                                    final String entityType, final DOMEntityOwnershipListener listener) {
53         super(context);
54         this.localMember = localMember;
55         this.entityType = entityType;
56         this.listener = listener;
57
58         new ReplicatorMessageAdapter<TypeListenerCommand, ORMap<DOMEntity, ORSet<String>>>(context,
59             DistributedData.get(context.getSystem()).replicator(), Duration.ofSeconds(5))
60                 .subscribe(CandidateRegistry.KEY, CandidatesChanged::new);
61     }
62
63     public static Behavior<TypeListenerCommand> create(final String localMember, final String entityType,
64                                                        final DOMEntityOwnershipListener listener) {
65         return Behaviors.setup(ctx -> new EntityTypeListenerActor(ctx, localMember, entityType, listener));
66     }
67
68     @Override
69     public Receive<TypeListenerCommand> createReceive() {
70         return newReceiveBuilder()
71                 .onMessage(CandidatesChanged.class, this::onCandidatesChanged)
72                 .onMessage(EntityOwnerChanged.class, this::onOwnerChanged)
73                 .onMessage(TerminateListener.class, this::onTerminate)
74                 .build();
75     }
76
77     private Behavior<TypeListenerCommand> onCandidatesChanged(final CandidatesChanged notification) {
78         final SubscribeResponse<ORMap<DOMEntity, ORSet<String>>> response = notification.getResponse();
79         if (response instanceof Changed) {
80             processCandidates(((Changed<ORMap<DOMEntity, ORSet<String>>>) response).get(response.key()).getEntries());
81         } else {
82             LOG.warn("Unexpected notification from replicator: {}", response);
83         }
84         return this;
85     }
86
87     private void processCandidates(final Map<DOMEntity, ORSet<String>> entries) {
88         final Map<DOMEntity, ORSet<String>> filteredCandidates = entries.entrySet().stream()
89             .filter(entry -> entry.getKey().getType().equals(entityType))
90             .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
91         LOG.debug("Entity-type: {} current candidates: {}", entityType, filteredCandidates);
92
93         final Set<DOMEntity> removed =
94                 ImmutableSet.copyOf(Sets.difference(activeListeners.keySet(), filteredCandidates.keySet()));
95         if (!removed.isEmpty()) {
96             LOG.debug("Stopping listeners for {}", removed);
97             // kill actors for the removed
98             removed.forEach(removedEntity -> getContext().stop(activeListeners.remove(removedEntity)));
99         }
100
101         for (final Entry<DOMEntity, ORSet<String>> entry : filteredCandidates.entrySet()) {
102             activeListeners.computeIfAbsent(entry.getKey(), key -> {
103                 // spawn actor for this entity
104                 LOG.debug("Starting listener for {}", key);
105                 return getContext().spawn(SingleEntityListenerActor.create(localMember, key, getContext().getSelf()),
106                     "SingleEntityListener-" + encodeEntityToActorName(key));
107             });
108         }
109     }
110
111     private Behavior<TypeListenerCommand> onOwnerChanged(final EntityOwnerChanged rsp) {
112         LOG.debug("{} : Entity-type: {} listener, owner change: {}", localMember, entityType, rsp);
113         listener.ownershipChanged(rsp.entity(), rsp.change(), false);
114         return this;
115     }
116
117     private Behavior<TypeListenerCommand> onTerminate(final TerminateListener command) {
118         LOG.debug("Terminating listener for type: {}, listener: {}", entityType, listener);
119         return Behaviors.stopped();
120     }
121
122     private static String encodeEntityToActorName(final DOMEntity entity) {
123         return "type=" + entity.getType() + ",entity="
124                 + entity.getIdentifier().getLastPathArgument().getNodeType().getLocalName() + "-" + UUID.randomUUID();
125     }
126 }