Introduce DOMEntityOwnershipService replacement
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / registry / listener / owner / SingleEntityListenerActor.java
diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/SingleEntityListenerActor.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/registry/listener/owner/SingleEntityListenerActor.java
new file mode 100644 (file)
index 0000000..279ee8f
--- /dev/null
@@ -0,0 +1,149 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.eos.akka.registry.listener.owner;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.ddata.LWWRegister;
+import akka.cluster.ddata.LWWRegisterKey;
+import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator;
+import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import java.time.Duration;
+import org.opendaylight.controller.eos.akka.registry.listener.owner.command.InitialOwnerSync;
+import org.opendaylight.controller.eos.akka.registry.listener.owner.command.ListenerCommand;
+import org.opendaylight.controller.eos.akka.registry.listener.owner.command.OwnerChanged;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.EntityOwnerChanged;
+import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerCommand;
+import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Keeps track of owners for a single entity, which is mapped to a single LWWRegister in distributed-data.
+ * Notifies the listener responsible for tracking the whole entity-type of changes.
+ */
+public class SingleEntityListenerActor extends AbstractBehavior<ListenerCommand> {
+    private static final Logger LOG = LoggerFactory.getLogger(SingleEntityListenerActor.class);
+
+    private final String localMember;
+    private final DOMEntity entity;
+    private final ActorRef<TypeListenerCommand> toNotify;
+    private final ReplicatorMessageAdapter<ListenerCommand, LWWRegister<String>> ownerReplicator;
+
+    private String currentOwner = "";
+
+    public SingleEntityListenerActor(final ActorContext<ListenerCommand> context, final String localMember,
+                                     final DOMEntity entity, final ActorRef<TypeListenerCommand> toNotify) {
+        super(context);
+        this.localMember = localMember;
+        this.entity = entity;
+        this.toNotify = toNotify;
+
+        final ActorRef<Replicator.Command> replicator = DistributedData.get(context.getSystem()).replicator();
+        ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
+
+        ownerReplicator.askGet(
+            replyTo -> new Replicator.Get<>(new LWWRegisterKey<>(entity.toString()), Replicator.readLocal(), replyTo),
+            InitialOwnerSync::new);
+        LOG.debug("OwnerListenerActor for {} started", entity.toString());
+    }
+
+    public static Behavior<ListenerCommand> create(final String localMember, final DOMEntity entity,
+                                                   final ActorRef<TypeListenerCommand> toNotify) {
+        return Behaviors.setup(ctx -> new SingleEntityListenerActor(ctx, localMember, entity, toNotify));
+    }
+
+    @Override
+    public Receive<ListenerCommand> createReceive() {
+        return newReceiveBuilder()
+                .onMessage(OwnerChanged.class, this::onOwnerChanged)
+                .onMessage(InitialOwnerSync.class, this::onInitialOwnerSync)
+                .build();
+    }
+
+    private Behavior<ListenerCommand> onInitialOwnerSync(final InitialOwnerSync ownerSync) {
+        final Replicator.GetResponse<LWWRegister<String>> response = ownerSync.getResponse();
+        LOG.debug("Received initial sync response for: {}, response: {}", entity, response);
+
+        // only trigger initial notification when there is no owner present as we wont get a subscription callback
+        // when distributed-data does not have any data for a key
+        if (response instanceof Replicator.NotFound) {
+
+            // no data is present, trigger initial notification with no owner
+            triggerNoOwnerNotification();
+        } else if (response instanceof Replicator.GetSuccess) {
+
+            // when we get a success just let subscribe callback handle the initial notification
+            LOG.debug("Owner present for entity: {} at the time of initial sync.", entity);
+        } else {
+            LOG.warn("Get has failed for entity: {}", response);
+        }
+
+        // make sure to subscribe AFTER initial notification
+        ownerReplicator.subscribe(new LWWRegisterKey<>(entity.toString()), OwnerChanged::new);
+
+        return this;
+    }
+
+    private void triggerNoOwnerNotification() {
+        LOG.debug("Triggering initial notification without an owner for: {}", entity);
+
+        toNotify.tell(new EntityOwnerChanged(new DOMEntityOwnershipChange(
+                entity, EntityOwnershipChangeState.REMOTE_OWNERSHIP_LOST_NO_OWNER)));
+    }
+
+    private Behavior<ListenerCommand> onOwnerChanged(final OwnerChanged ownerChanged) {
+
+        final Replicator.SubscribeResponse<LWWRegister<String>> response = ownerChanged.getResponse();
+        if (response instanceof Replicator.Changed) {
+
+            final Replicator.Changed<LWWRegister<String>> registerChanged =
+                    (Replicator.Changed<LWWRegister<String>>) response;
+            LOG.debug("Owner changed for: {}, prevOwner: {}, newOwner: {}",
+                    entity, currentOwner, registerChanged.get(registerChanged.key()).getValue());
+            handleOwnerChange(registerChanged);
+        } else if (response instanceof Replicator.Deleted) {
+            handleOwnerLost((Replicator.Deleted<LWWRegister<String>>) response);
+        }
+
+        return this;
+    }
+
+    private void handleOwnerChange(final Replicator.Changed<LWWRegister<String>> changed) {
+        final String newOwner = changed.get(changed.key()).getValue();
+
+        final boolean wasOwner = currentOwner.equals(localMember);
+        final boolean isOwner = newOwner.equals(localMember);
+        final boolean hasOwner = !newOwner.equals("");
+
+        LOG.debug("Owner changed for entity:{}, currentOwner: {}, wasOwner: {}, isOwner: {}, hasOwner:{}",
+                entity, currentOwner, wasOwner, isOwner, hasOwner);
+
+        currentOwner = newOwner;
+
+        toNotify.tell(new EntityOwnerChanged(new DOMEntityOwnershipChange(
+                entity, EntityOwnershipChangeState.from(wasOwner, isOwner, hasOwner))));
+    }
+
+    private void handleOwnerLost(final Replicator.Deleted<LWWRegister<String>> changed) {
+        final boolean wasOwner = currentOwner.equals(localMember);
+
+        LOG.debug("Owner lost for entity:{}, currentOwner: {}, wasOwner: {}", entity, currentOwner, wasOwner);
+
+        currentOwner = "";
+        toNotify.tell(new EntityOwnerChanged(new DOMEntityOwnershipChange(
+                entity, EntityOwnershipChangeState.from(wasOwner, false, false))));
+    }
+}