Remove JournalWriter.getLastEntry()
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / owner / supervisor / OwnerSyncer.java
index a73a5620b39687beb411d8ee911f075b182276d7..32a0a643695154247e1c396d84a54951af2f3c97 100644 (file)
@@ -7,9 +7,10 @@
  */
 package org.opendaylight.controller.eos.akka.owner.supervisor;
 
+import static java.util.Objects.requireNonNull;
+
 import akka.actor.typed.ActorRef;
 import akka.actor.typed.Behavior;
-import akka.actor.typed.javadsl.AbstractBehavior;
 import akka.actor.typed.javadsl.ActorContext;
 import akka.actor.typed.javadsl.Behaviors;
 import akka.actor.typed.javadsl.Receive;
@@ -20,18 +21,26 @@ import akka.cluster.ddata.ORSet;
 import akka.cluster.ddata.typed.javadsl.DistributedData;
 import akka.cluster.ddata.typed.javadsl.Replicator;
 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import akka.pattern.StatusReply;
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidates;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesForMember;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterActivated;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialCandidateSync;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialOwnerSync;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorRequest;
 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
+import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,7 +49,7 @@ import org.slf4j.LoggerFactory;
  * Behavior that retrieves current candidates/owners from distributed-data and switches to OwnerSupervisor when the
  * sync has finished.
  */
-public final class OwnerSyncer extends AbstractBehavior<OwnerSupervisorCommand> {
+public final class OwnerSyncer extends AbstractSupervisor {
     private static final Logger LOG = LoggerFactory.getLogger(OwnerSyncer.class);
 
     private final ReplicatorMessageAdapter<OwnerSupervisorCommand, LWWRegister<String>> ownerReplicator;
@@ -49,20 +58,22 @@ public final class OwnerSyncer extends AbstractBehavior<OwnerSupervisorCommand>
 
     // String representation of Entity to DOMEntity
     private final Map<String, DOMEntity> entityLookup = new HashMap<>();
+    private final BindingInstanceIdentifierCodec iidCodec;
 
     private int toSync = -1;
 
     private OwnerSyncer(final ActorContext<OwnerSupervisorCommand> context,
-                        @Nullable final ActorRef<OwnerSupervisorReply> notifyDatacenterStarted) {
+                        final @Nullable ActorRef<OwnerSupervisorReply> notifyDatacenterStarted,
+                        final BindingInstanceIdentifierCodec iidCodec) {
         super(context);
+        this.iidCodec = requireNonNull(iidCodec);
         LOG.debug("Starting candidate and owner sync");
 
         final ActorRef<Replicator.Command> replicator = DistributedData.get(context.getSystem()).replicator();
 
-        this.ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
+        ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
 
-        new ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>>(context, replicator,
-            Duration.ofSeconds(5)).askGet(
+        candidateReplicator.askGet(
                 askReplyTo -> new Replicator.Get<>(CandidateRegistry.KEY, Replicator.readLocal(), askReplyTo),
                 InitialCandidateSync::new);
 
@@ -71,9 +82,9 @@ public final class OwnerSyncer extends AbstractBehavior<OwnerSupervisorCommand>
         }
     }
 
-    public static Behavior<OwnerSupervisorCommand> create(
-            final ActorRef<OwnerSupervisorReply> notifyDatacenterStarted) {
-        return Behaviors.setup(ctx -> new OwnerSyncer(ctx, notifyDatacenterStarted));
+    public static Behavior<OwnerSupervisorCommand> create(final ActorRef<OwnerSupervisorReply> notifyDatacenterStarted,
+            final BindingInstanceIdentifierCodec iidCodec) {
+        return Behaviors.setup(ctx -> new OwnerSyncer(ctx, notifyDatacenterStarted, iidCodec));
     }
 
     @Override
@@ -81,9 +92,21 @@ public final class OwnerSyncer extends AbstractBehavior<OwnerSupervisorCommand>
         return newReceiveBuilder()
                 .onMessage(InitialCandidateSync.class, this::onInitialCandidateSync)
                 .onMessage(InitialOwnerSync.class, this::onInitialOwnerSync)
+                .onMessage(GetEntitiesBackendRequest.class, this::onFailEntityRpc)
+                .onMessage(GetEntityBackendRequest.class, this::onFailEntityRpc)
+                .onMessage(GetEntityOwnerBackendRequest.class, this::onFailEntityRpc)
+                .onMessage(ClearCandidatesForMember.class, this::onClearCandidatesForMember)
+                .onMessage(ClearCandidates.class, this::finishClearCandidates)
                 .build();
     }
 
+    private Behavior<OwnerSupervisorCommand> onFailEntityRpc(final OwnerSupervisorRequest message) {
+        LOG.debug("Failing rpc request. {}", message);
+        message.getReplyTo().tell(StatusReply.error(
+            "OwnerSupervisor is inactive so it cannot handle entity rpc requests."));
+        return this;
+    }
+
     private Behavior<OwnerSupervisorCommand> onInitialCandidateSync(final InitialCandidateSync rsp) {
         final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response = rsp.getResponse();
         if (response instanceof Replicator.GetSuccess) {
@@ -142,8 +165,7 @@ public final class OwnerSyncer extends AbstractBehavior<OwnerSupervisorCommand>
     private Behavior<OwnerSupervisorCommand> switchToSupervisor() {
         LOG.debug("Initial sync done, switching to supervisor. candidates: {}, owners: {}",
                 currentCandidates, currentOwners);
-        return Behaviors.setup(ctx ->
-                OwnerSupervisor.create(currentCandidates, currentOwners));
+        return Behaviors.setup(ctx -> OwnerSupervisor.create(currentCandidates, currentOwners, iidCodec));
     }
 
     private void handleOwnerRsp(final Replicator.GetSuccess<LWWRegister<String>> rsp) {
@@ -156,4 +178,9 @@ public final class OwnerSyncer extends AbstractBehavior<OwnerSupervisorCommand>
     private static void handleNotFoundOwnerRsp(final Replicator.NotFound<LWWRegister<String>> rsp) {
         LOG.debug("Owner not found. {}", rsp);
     }
+
+    @Override
+    Logger getLogger() {
+        return LOG;
+    }
 }