2 * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.eos.akka.registry.listener.owner;
10 import akka.actor.typed.ActorRef;
11 import akka.actor.typed.Behavior;
12 import akka.actor.typed.javadsl.AbstractBehavior;
13 import akka.actor.typed.javadsl.ActorContext;
14 import akka.actor.typed.javadsl.Behaviors;
15 import akka.actor.typed.javadsl.Receive;
16 import akka.cluster.ddata.LWWRegister;
17 import akka.cluster.ddata.LWWRegisterKey;
18 import akka.cluster.ddata.typed.javadsl.DistributedData;
19 import akka.cluster.ddata.typed.javadsl.Replicator;
20 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
21 import java.time.Duration;
22 import org.opendaylight.controller.eos.akka.registry.listener.owner.command.InitialOwnerSync;
23 import org.opendaylight.controller.eos.akka.registry.listener.owner.command.ListenerCommand;
24 import org.opendaylight.controller.eos.akka.registry.listener.owner.command.OwnerChanged;
25 import org.opendaylight.controller.eos.akka.registry.listener.type.command.EntityOwnerChanged;
26 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerCommand;
27 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipStateChange;
28 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
33 * Keeps track of owners for a single entity, which is mapped to a single LWWRegister in distributed-data.
34 * Notifies the listener responsible for tracking the whole entity-type of changes.
36 public class SingleEntityListenerActor extends AbstractBehavior<ListenerCommand> {
37 private static final Logger LOG = LoggerFactory.getLogger(SingleEntityListenerActor.class);
39 private final String localMember;
40 private final DOMEntity entity;
41 private final ActorRef<TypeListenerCommand> toNotify;
42 private final ReplicatorMessageAdapter<ListenerCommand, LWWRegister<String>> ownerReplicator;
44 private String currentOwner = "";
46 public SingleEntityListenerActor(final ActorContext<ListenerCommand> context, final String localMember,
47 final DOMEntity entity, final ActorRef<TypeListenerCommand> toNotify) {
49 this.localMember = localMember;
51 this.toNotify = toNotify;
53 final ActorRef<Replicator.Command> replicator = DistributedData.get(context.getSystem()).replicator();
54 ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
56 ownerReplicator.askGet(
57 replyTo -> new Replicator.Get<>(new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), replyTo),
58 InitialOwnerSync::new);
59 LOG.debug("OwnerListenerActor for {} started", entity.toString());
62 public static Behavior<ListenerCommand> create(final String localMember, final DOMEntity entity,
63 final ActorRef<TypeListenerCommand> toNotify) {
64 return Behaviors.setup(ctx -> new SingleEntityListenerActor(ctx, localMember, entity, toNotify));
68 public Receive<ListenerCommand> createReceive() {
69 return newReceiveBuilder()
70 .onMessage(OwnerChanged.class, this::onOwnerChanged)
71 .onMessage(InitialOwnerSync.class, this::onInitialOwnerSync)
75 private Behavior<ListenerCommand> onInitialOwnerSync(final InitialOwnerSync ownerSync) {
76 final Replicator.GetResponse<LWWRegister<String>> response = ownerSync.getResponse();
77 LOG.debug("Received initial sync response for: {}, response: {}", entity, response);
79 // only trigger initial notification when there is no owner present as we wont get a subscription callback
80 // when distributed-data does not have any data for a key
81 if (response instanceof Replicator.NotFound) {
83 // no data is present, trigger initial notification with no owner
84 triggerNoOwnerNotification();
85 } else if (response instanceof Replicator.GetSuccess) {
87 // when we get a success just let subscribe callback handle the initial notification
88 LOG.debug("Owner present for entity: {} at the time of initial sync.", entity);
90 LOG.warn("Get has failed for entity: {}", response);
93 // make sure to subscribe AFTER initial notification
94 ownerReplicator.subscribe(new LWWRegisterKey<>(entity.toString()), OwnerChanged::new);
99 private void triggerNoOwnerNotification() {
100 LOG.debug("Triggering initial notification without an owner for: {}", entity);
101 toNotify.tell(new EntityOwnerChanged(entity, EntityOwnershipStateChange.REMOTE_OWNERSHIP_LOST_NO_OWNER, false));
104 private Behavior<ListenerCommand> onOwnerChanged(final OwnerChanged ownerChanged) {
106 final Replicator.SubscribeResponse<LWWRegister<String>> response = ownerChanged.getResponse();
107 if (response instanceof Replicator.Changed) {
109 final Replicator.Changed<LWWRegister<String>> registerChanged =
110 (Replicator.Changed<LWWRegister<String>>) response;
111 LOG.debug("Owner changed for: {}, prevOwner: {}, newOwner: {}",
112 entity, currentOwner, registerChanged.get(registerChanged.key()).getValue());
113 handleOwnerChange(registerChanged);
114 } else if (response instanceof Replicator.Deleted) {
115 handleOwnerLost((Replicator.Deleted<LWWRegister<String>>) response);
121 private void handleOwnerChange(final Replicator.Changed<LWWRegister<String>> changed) {
122 final String newOwner = changed.get(changed.key()).getValue();
124 final boolean wasOwner = currentOwner.equals(localMember);
125 final boolean isOwner = newOwner.equals(localMember);
126 final boolean hasOwner = !newOwner.equals("");
128 LOG.debug("Owner changed for entity:{}, currentOwner: {}, wasOwner: {}, isOwner: {}, hasOwner:{}",
129 entity, currentOwner, wasOwner, isOwner, hasOwner);
131 currentOwner = newOwner;
133 toNotify.tell(new EntityOwnerChanged(entity, EntityOwnershipStateChange.from(wasOwner, isOwner, hasOwner),
137 private void handleOwnerLost(final Replicator.Deleted<LWWRegister<String>> changed) {
138 final boolean wasOwner = currentOwner.equals(localMember);
140 LOG.debug("Owner lost for entity:{}, currentOwner: {}, wasOwner: {}", entity, currentOwner, wasOwner);
143 toNotify.tell(new EntityOwnerChanged(entity, EntityOwnershipStateChange.from(wasOwner, false, false), false));