Introduce DOMEntityOwnershipService replacement
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / registry / listener / owner / SingleEntityListenerActor.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka.registry.listener.owner;
9
10 import akka.actor.typed.ActorRef;
11 import akka.actor.typed.Behavior;
12 import akka.actor.typed.javadsl.AbstractBehavior;
13 import akka.actor.typed.javadsl.ActorContext;
14 import akka.actor.typed.javadsl.Behaviors;
15 import akka.actor.typed.javadsl.Receive;
16 import akka.cluster.ddata.LWWRegister;
17 import akka.cluster.ddata.LWWRegisterKey;
18 import akka.cluster.ddata.typed.javadsl.DistributedData;
19 import akka.cluster.ddata.typed.javadsl.Replicator;
20 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
21 import java.time.Duration;
22 import org.opendaylight.controller.eos.akka.registry.listener.owner.command.InitialOwnerSync;
23 import org.opendaylight.controller.eos.akka.registry.listener.owner.command.ListenerCommand;
24 import org.opendaylight.controller.eos.akka.registry.listener.owner.command.OwnerChanged;
25 import org.opendaylight.controller.eos.akka.registry.listener.type.command.EntityOwnerChanged;
26 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerCommand;
27 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
28 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
29 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 /**
34  * Keeps track of owners for a single entity, which is mapped to a single LWWRegister in distributed-data.
35  * Notifies the listener responsible for tracking the whole entity-type of changes.
36  */
37 public class SingleEntityListenerActor extends AbstractBehavior<ListenerCommand> {
38     private static final Logger LOG = LoggerFactory.getLogger(SingleEntityListenerActor.class);
39
40     private final String localMember;
41     private final DOMEntity entity;
42     private final ActorRef<TypeListenerCommand> toNotify;
43     private final ReplicatorMessageAdapter<ListenerCommand, LWWRegister<String>> ownerReplicator;
44
45     private String currentOwner = "";
46
47     public SingleEntityListenerActor(final ActorContext<ListenerCommand> context, final String localMember,
48                                      final DOMEntity entity, final ActorRef<TypeListenerCommand> toNotify) {
49         super(context);
50         this.localMember = localMember;
51         this.entity = entity;
52         this.toNotify = toNotify;
53
54         final ActorRef<Replicator.Command> replicator = DistributedData.get(context.getSystem()).replicator();
55         ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
56
57         ownerReplicator.askGet(
58             replyTo -> new Replicator.Get<>(new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), replyTo),
59             InitialOwnerSync::new);
60         LOG.debug("OwnerListenerActor for {} started", entity.toString());
61     }
62
63     public static Behavior<ListenerCommand> create(final String localMember, final DOMEntity entity,
64                                                    final ActorRef<TypeListenerCommand> toNotify) {
65         return Behaviors.setup(ctx -> new SingleEntityListenerActor(ctx, localMember, entity, toNotify));
66     }
67
68     @Override
69     public Receive<ListenerCommand> createReceive() {
70         return newReceiveBuilder()
71                 .onMessage(OwnerChanged.class, this::onOwnerChanged)
72                 .onMessage(InitialOwnerSync.class, this::onInitialOwnerSync)
73                 .build();
74     }
75
76     private Behavior<ListenerCommand> onInitialOwnerSync(final InitialOwnerSync ownerSync) {
77         final Replicator.GetResponse<LWWRegister<String>> response = ownerSync.getResponse();
78         LOG.debug("Received initial sync response for: {}, response: {}", entity, response);
79
80         // only trigger initial notification when there is no owner present as we wont get a subscription callback
81         // when distributed-data does not have any data for a key
82         if (response instanceof Replicator.NotFound) {
83
84             // no data is present, trigger initial notification with no owner
85             triggerNoOwnerNotification();
86         } else if (response instanceof Replicator.GetSuccess) {
87
88             // when we get a success just let subscribe callback handle the initial notification
89             LOG.debug("Owner present for entity: {} at the time of initial sync.", entity);
90         } else {
91             LOG.warn("Get has failed for entity: {}", response);
92         }
93
94         // make sure to subscribe AFTER initial notification
95         ownerReplicator.subscribe(new LWWRegisterKey<>(entity.toString()), OwnerChanged::new);
96
97         return this;
98     }
99
100     private void triggerNoOwnerNotification() {
101         LOG.debug("Triggering initial notification without an owner for: {}", entity);
102
103         toNotify.tell(new EntityOwnerChanged(new DOMEntityOwnershipChange(
104                 entity, EntityOwnershipChangeState.REMOTE_OWNERSHIP_LOST_NO_OWNER)));
105     }
106
107     private Behavior<ListenerCommand> onOwnerChanged(final OwnerChanged ownerChanged) {
108
109         final Replicator.SubscribeResponse<LWWRegister<String>> response = ownerChanged.getResponse();
110         if (response instanceof Replicator.Changed) {
111
112             final Replicator.Changed<LWWRegister<String>> registerChanged =
113                     (Replicator.Changed<LWWRegister<String>>) response;
114             LOG.debug("Owner changed for: {}, prevOwner: {}, newOwner: {}",
115                     entity, currentOwner, registerChanged.get(registerChanged.key()).getValue());
116             handleOwnerChange(registerChanged);
117         } else if (response instanceof Replicator.Deleted) {
118             handleOwnerLost((Replicator.Deleted<LWWRegister<String>>) response);
119         }
120
121         return this;
122     }
123
124     private void handleOwnerChange(final Replicator.Changed<LWWRegister<String>> changed) {
125         final String newOwner = changed.get(changed.key()).getValue();
126
127         final boolean wasOwner = currentOwner.equals(localMember);
128         final boolean isOwner = newOwner.equals(localMember);
129         final boolean hasOwner = !newOwner.equals("");
130
131         LOG.debug("Owner changed for entity:{}, currentOwner: {}, wasOwner: {}, isOwner: {}, hasOwner:{}",
132                 entity, currentOwner, wasOwner, isOwner, hasOwner);
133
134         currentOwner = newOwner;
135
136         toNotify.tell(new EntityOwnerChanged(new DOMEntityOwnershipChange(
137                 entity, EntityOwnershipChangeState.from(wasOwner, isOwner, hasOwner))));
138     }
139
140     private void handleOwnerLost(final Replicator.Deleted<LWWRegister<String>> changed) {
141         final boolean wasOwner = currentOwner.equals(localMember);
142
143         LOG.debug("Owner lost for entity:{}, currentOwner: {}, wasOwner: {}", entity, currentOwner, wasOwner);
144
145         currentOwner = "";
146         toNotify.tell(new EntityOwnerChanged(new DOMEntityOwnershipChange(
147                 entity, EntityOwnershipChangeState.from(wasOwner, false, false))));
148     }
149 }