2 * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.eos.akka.registry.listener.owner;
10 import akka.actor.typed.ActorRef;
11 import akka.actor.typed.Behavior;
12 import akka.actor.typed.javadsl.AbstractBehavior;
13 import akka.actor.typed.javadsl.ActorContext;
14 import akka.actor.typed.javadsl.Behaviors;
15 import akka.actor.typed.javadsl.Receive;
16 import akka.cluster.ddata.LWWRegister;
17 import akka.cluster.ddata.LWWRegisterKey;
18 import akka.cluster.ddata.typed.javadsl.DistributedData;
19 import akka.cluster.ddata.typed.javadsl.Replicator;
20 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
21 import java.time.Duration;
22 import org.opendaylight.controller.eos.akka.registry.listener.owner.command.InitialOwnerSync;
23 import org.opendaylight.controller.eos.akka.registry.listener.owner.command.ListenerCommand;
24 import org.opendaylight.controller.eos.akka.registry.listener.owner.command.OwnerChanged;
25 import org.opendaylight.controller.eos.akka.registry.listener.type.command.EntityOwnerChanged;
26 import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerCommand;
27 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
28 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
29 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
34 * Keeps track of owners for a single entity, which is mapped to a single LWWRegister in distributed-data.
35 * Notifies the listener responsible for tracking the whole entity-type of changes.
37 public class SingleEntityListenerActor extends AbstractBehavior<ListenerCommand> {
38 private static final Logger LOG = LoggerFactory.getLogger(SingleEntityListenerActor.class);
40 private final String localMember;
41 private final DOMEntity entity;
42 private final ActorRef<TypeListenerCommand> toNotify;
43 private final ReplicatorMessageAdapter<ListenerCommand, LWWRegister<String>> ownerReplicator;
45 private String currentOwner = "";
47 public SingleEntityListenerActor(final ActorContext<ListenerCommand> context, final String localMember,
48 final DOMEntity entity, final ActorRef<TypeListenerCommand> toNotify) {
50 this.localMember = localMember;
52 this.toNotify = toNotify;
54 final ActorRef<Replicator.Command> replicator = DistributedData.get(context.getSystem()).replicator();
55 ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
57 ownerReplicator.askGet(
58 replyTo -> new Replicator.Get<>(new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), replyTo),
59 InitialOwnerSync::new);
60 LOG.debug("OwnerListenerActor for {} started", entity.toString());
63 public static Behavior<ListenerCommand> create(final String localMember, final DOMEntity entity,
64 final ActorRef<TypeListenerCommand> toNotify) {
65 return Behaviors.setup(ctx -> new SingleEntityListenerActor(ctx, localMember, entity, toNotify));
69 public Receive<ListenerCommand> createReceive() {
70 return newReceiveBuilder()
71 .onMessage(OwnerChanged.class, this::onOwnerChanged)
72 .onMessage(InitialOwnerSync.class, this::onInitialOwnerSync)
76 private Behavior<ListenerCommand> onInitialOwnerSync(final InitialOwnerSync ownerSync) {
77 final Replicator.GetResponse<LWWRegister<String>> response = ownerSync.getResponse();
78 LOG.debug("Received initial sync response for: {}, response: {}", entity, response);
80 // only trigger initial notification when there is no owner present as we wont get a subscription callback
81 // when distributed-data does not have any data for a key
82 if (response instanceof Replicator.NotFound) {
84 // no data is present, trigger initial notification with no owner
85 triggerNoOwnerNotification();
86 } else if (response instanceof Replicator.GetSuccess) {
88 // when we get a success just let subscribe callback handle the initial notification
89 LOG.debug("Owner present for entity: {} at the time of initial sync.", entity);
91 LOG.warn("Get has failed for entity: {}", response);
94 // make sure to subscribe AFTER initial notification
95 ownerReplicator.subscribe(new LWWRegisterKey<>(entity.toString()), OwnerChanged::new);
100 private void triggerNoOwnerNotification() {
101 LOG.debug("Triggering initial notification without an owner for: {}", entity);
103 toNotify.tell(new EntityOwnerChanged(new DOMEntityOwnershipChange(
104 entity, EntityOwnershipChangeState.REMOTE_OWNERSHIP_LOST_NO_OWNER)));
107 private Behavior<ListenerCommand> onOwnerChanged(final OwnerChanged ownerChanged) {
109 final Replicator.SubscribeResponse<LWWRegister<String>> response = ownerChanged.getResponse();
110 if (response instanceof Replicator.Changed) {
112 final Replicator.Changed<LWWRegister<String>> registerChanged =
113 (Replicator.Changed<LWWRegister<String>>) response;
114 LOG.debug("Owner changed for: {}, prevOwner: {}, newOwner: {}",
115 entity, currentOwner, registerChanged.get(registerChanged.key()).getValue());
116 handleOwnerChange(registerChanged);
117 } else if (response instanceof Replicator.Deleted) {
118 handleOwnerLost((Replicator.Deleted<LWWRegister<String>>) response);
124 private void handleOwnerChange(final Replicator.Changed<LWWRegister<String>> changed) {
125 final String newOwner = changed.get(changed.key()).getValue();
127 final boolean wasOwner = currentOwner.equals(localMember);
128 final boolean isOwner = newOwner.equals(localMember);
129 final boolean hasOwner = !newOwner.equals("");
131 LOG.debug("Owner changed for entity:{}, currentOwner: {}, wasOwner: {}, isOwner: {}, hasOwner:{}",
132 entity, currentOwner, wasOwner, isOwner, hasOwner);
134 currentOwner = newOwner;
136 toNotify.tell(new EntityOwnerChanged(new DOMEntityOwnershipChange(
137 entity, EntityOwnershipChangeState.from(wasOwner, isOwner, hasOwner))));
140 private void handleOwnerLost(final Replicator.Deleted<LWWRegister<String>> changed) {
141 final boolean wasOwner = currentOwner.equals(localMember);
143 LOG.debug("Owner lost for entity:{}, currentOwner: {}, wasOwner: {}", entity, currentOwner, wasOwner);
146 toNotify.tell(new EntityOwnerChanged(new DOMEntityOwnershipChange(
147 entity, EntityOwnershipChangeState.from(wasOwner, false, false))));