/* * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.eos.akka.registry.listener.owner; import akka.actor.typed.ActorRef; import akka.actor.typed.Behavior; import akka.actor.typed.javadsl.AbstractBehavior; import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Receive; import akka.cluster.ddata.LWWRegister; import akka.cluster.ddata.LWWRegisterKey; import akka.cluster.ddata.typed.javadsl.DistributedData; import akka.cluster.ddata.typed.javadsl.Replicator; import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter; import java.time.Duration; import org.opendaylight.controller.eos.akka.registry.listener.owner.command.InitialOwnerSync; import org.opendaylight.controller.eos.akka.registry.listener.owner.command.ListenerCommand; import org.opendaylight.controller.eos.akka.registry.listener.owner.command.OwnerChanged; import org.opendaylight.controller.eos.akka.registry.listener.type.command.EntityOwnerChanged; import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerCommand; import org.opendaylight.mdsal.eos.common.api.EntityOwnershipStateChange; import org.opendaylight.mdsal.eos.dom.api.DOMEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Keeps track of owners for a single entity, which is mapped to a single LWWRegister in distributed-data. * Notifies the listener responsible for tracking the whole entity-type of changes. */ public class SingleEntityListenerActor extends AbstractBehavior { private static final Logger LOG = LoggerFactory.getLogger(SingleEntityListenerActor.class); private final String localMember; private final DOMEntity entity; private final ActorRef toNotify; private final ReplicatorMessageAdapter> ownerReplicator; private String currentOwner = ""; public SingleEntityListenerActor(final ActorContext context, final String localMember, final DOMEntity entity, final ActorRef toNotify) { super(context); this.localMember = localMember; this.entity = entity; this.toNotify = toNotify; final ActorRef replicator = DistributedData.get(context.getSystem()).replicator(); ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5)); ownerReplicator.askGet( replyTo -> new Replicator.Get<>(new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), replyTo), InitialOwnerSync::new); LOG.debug("OwnerListenerActor for {} started", entity.toString()); } public static Behavior create(final String localMember, final DOMEntity entity, final ActorRef toNotify) { return Behaviors.setup(ctx -> new SingleEntityListenerActor(ctx, localMember, entity, toNotify)); } @Override public Receive createReceive() { return newReceiveBuilder() .onMessage(OwnerChanged.class, this::onOwnerChanged) .onMessage(InitialOwnerSync.class, this::onInitialOwnerSync) .build(); } private Behavior onInitialOwnerSync(final InitialOwnerSync ownerSync) { final Replicator.GetResponse> response = ownerSync.getResponse(); LOG.debug("Received initial sync response for: {}, response: {}", entity, response); // only trigger initial notification when there is no owner present as we wont get a subscription callback // when distributed-data does not have any data for a key if (response instanceof Replicator.NotFound) { // no data is present, trigger initial notification with no owner triggerNoOwnerNotification(); } else if (response instanceof Replicator.GetSuccess) { // when we get a success just let subscribe callback handle the initial notification LOG.debug("Owner present for entity: {} at the time of initial sync.", entity); } else { LOG.warn("Get has failed for entity: {}", response); } // make sure to subscribe AFTER initial notification ownerReplicator.subscribe(new LWWRegisterKey<>(entity.toString()), OwnerChanged::new); return this; } private void triggerNoOwnerNotification() { LOG.debug("Triggering initial notification without an owner for: {}", entity); toNotify.tell(new EntityOwnerChanged(entity, EntityOwnershipStateChange.REMOTE_OWNERSHIP_LOST_NO_OWNER, false)); } private Behavior onOwnerChanged(final OwnerChanged ownerChanged) { final Replicator.SubscribeResponse> response = ownerChanged.getResponse(); if (response instanceof Replicator.Changed) { final Replicator.Changed> registerChanged = (Replicator.Changed>) response; LOG.debug("Owner changed for: {}, prevOwner: {}, newOwner: {}", entity, currentOwner, registerChanged.get(registerChanged.key()).getValue()); handleOwnerChange(registerChanged); } else if (response instanceof Replicator.Deleted) { handleOwnerLost((Replicator.Deleted>) response); } return this; } private void handleOwnerChange(final Replicator.Changed> changed) { final String newOwner = changed.get(changed.key()).getValue(); final boolean wasOwner = currentOwner.equals(localMember); final boolean isOwner = newOwner.equals(localMember); final boolean hasOwner = !newOwner.equals(""); LOG.debug("Owner changed for entity:{}, currentOwner: {}, wasOwner: {}, isOwner: {}, hasOwner:{}", entity, currentOwner, wasOwner, isOwner, hasOwner); currentOwner = newOwner; toNotify.tell(new EntityOwnerChanged(entity, EntityOwnershipStateChange.from(wasOwner, isOwner, hasOwner), false)); } private void handleOwnerLost(final Replicator.Deleted> changed) { final boolean wasOwner = currentOwner.equals(localMember); LOG.debug("Owner lost for entity:{}, currentOwner: {}, wasOwner: {}", entity, currentOwner, wasOwner); currentOwner = ""; toNotify.tell(new EntityOwnerChanged(entity, EntityOwnershipStateChange.from(wasOwner, false, false), false)); } }