Add an actor for entity rpc execution.
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / owner / checker / OwnerStateChecker.java
index 57c40aca7854c13f3dcb420836777260125d0794..6d418a80e7d9b873988b4e5f0a97acfe6ca279ee 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.eos.akka.owner.checker;
 
 import static java.util.Objects.requireNonNull;
 
+import akka.actor.typed.ActorRef;
 import akka.actor.typed.Behavior;
 import akka.actor.typed.javadsl.AbstractBehavior;
 import akka.actor.typed.javadsl.ActorContext;
@@ -17,6 +18,7 @@ import akka.actor.typed.javadsl.Receive;
 import akka.cluster.ddata.LWWRegister;
 import akka.cluster.ddata.LWWRegisterKey;
 import akka.cluster.ddata.typed.javadsl.DistributedData;
+import akka.cluster.ddata.typed.javadsl.Replicator;
 import akka.cluster.ddata.typed.javadsl.Replicator.Get;
 import akka.cluster.ddata.typed.javadsl.Replicator.GetFailure;
 import akka.cluster.ddata.typed.javadsl.Replicator.GetResponse;
@@ -25,10 +27,16 @@ import akka.cluster.ddata.typed.javadsl.Replicator.NotFound;
 import akka.cluster.ddata.typed.javadsl.Replicator.ReadMajority;
 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
 import java.time.Duration;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntitiesRequest;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityOwnerRequest;
+import org.opendaylight.controller.eos.akka.owner.checker.command.GetEntityRequest;
 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipState;
 import org.opendaylight.controller.eos.akka.owner.checker.command.GetOwnershipStateReply;
 import org.opendaylight.controller.eos.akka.owner.checker.command.InternalGetReply;
 import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerCommand;
+import org.opendaylight.controller.eos.akka.owner.checker.command.StateCheckerRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,18 +46,28 @@ public final class OwnerStateChecker extends AbstractBehavior<StateCheckerComman
     private static final Duration GET_OWNERSHIP_TIMEOUT = Duration.ofSeconds(5);
     private static final Duration UNEXPECTED_ASK_TIMEOUT = Duration.ofSeconds(5);
 
-    private final ReplicatorMessageAdapter<StateCheckerCommand, LWWRegister<String>> replicatorAdapter;
+    private final ReplicatorMessageAdapter<StateCheckerCommand, LWWRegister<String>> ownerReplicator;
+    private final ActorRef<OwnerSupervisorCommand> ownerSupervisor;
+    private final BindingInstanceIdentifierCodec iidCodec;
+    private final ActorRef<Replicator.Command> replicator;
     private final String localMember;
 
-    private OwnerStateChecker(final ActorContext<StateCheckerCommand> context, final String localMember) {
+    private OwnerStateChecker(final ActorContext<StateCheckerCommand> context,
+                              final String localMember,
+                              final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
+                              final BindingInstanceIdentifierCodec iidCodec) {
         super(context);
         this.localMember = requireNonNull(localMember);
-        replicatorAdapter = new ReplicatorMessageAdapter<>(context,
-            DistributedData.get(context.getSystem()).replicator(), UNEXPECTED_ASK_TIMEOUT);
+        this.ownerSupervisor = requireNonNull(ownerSupervisor);
+        this.iidCodec = requireNonNull(iidCodec);
+        replicator = DistributedData.get(context.getSystem()).replicator();
+        ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, UNEXPECTED_ASK_TIMEOUT);
     }
 
-    public static Behavior<StateCheckerCommand> create(final String localMember) {
-        return Behaviors.setup(ctx -> new OwnerStateChecker(ctx, localMember));
+    public static Behavior<StateCheckerCommand> create(final String localMember,
+                                                       final ActorRef<OwnerSupervisorCommand> ownerSupervisor,
+                                                       final BindingInstanceIdentifierCodec iidCodec) {
+        return Behaviors.setup(ctx -> new OwnerStateChecker(ctx, localMember, ownerSupervisor, iidCodec));
     }
 
     @Override
@@ -57,11 +75,14 @@ public final class OwnerStateChecker extends AbstractBehavior<StateCheckerComman
         return newReceiveBuilder()
                 .onMessage(GetOwnershipState.class, this::onGetOwnershipState)
                 .onMessage(InternalGetReply.class, this::respondWithState)
+                .onMessage(GetEntitiesRequest.class, this::executeEntityRpc)
+                .onMessage(GetEntityRequest.class, this::executeEntityRpc)
+                .onMessage(GetEntityOwnerRequest.class, this::executeEntityRpc)
                 .build();
     }
 
     private Behavior<StateCheckerCommand> onGetOwnershipState(final GetOwnershipState message) {
-        replicatorAdapter.askGet(
+        ownerReplicator.askGet(
                 askReplyTo -> new Get<>(
                         new LWWRegisterKey<>(message.getEntity().toString()),
                         new ReadMajority(GET_OWNERSHIP_TIMEOUT),
@@ -90,4 +111,13 @@ public final class OwnerStateChecker extends AbstractBehavior<StateCheckerComman
         }
         return this;
     }
+
+    private Behavior<StateCheckerCommand> executeEntityRpc(final StateCheckerRequest request) {
+        final ActorRef<StateCheckerCommand> rpcHandler =
+                getContext().spawnAnonymous(EntityRpcHandler.create(ownerSupervisor, iidCodec));
+
+        LOG.debug("Executing entity rpc: {} in actor: {}", request, rpcHandler);
+        rpcHandler.tell(request);
+        return this;
+    }
 }