X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Feos-dom-akka%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Feos%2Fakka%2Fowner%2Fsupervisor%2FOwnerSyncer.java;h=092f532dfbd06eb6a0097aef157627c4dfd19752;hb=1adec580405aafae353f8b0b3a5a0f474a05c6c0;hp=1a8df09f1d5b68cc1f3b08afa2cf809d147ccd71;hpb=e1e6d8e34fd4c5c5c07c7a8063ffa94a8dbe2062;p=controller.git diff --git a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSyncer.java b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSyncer.java index 1a8df09f1d..092f532dfb 100644 --- a/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSyncer.java +++ b/opendaylight/md-sal/eos-dom-akka/src/main/java/org/opendaylight/controller/eos/akka/owner/supervisor/OwnerSyncer.java @@ -7,6 +7,8 @@ */ package org.opendaylight.controller.eos.akka.owner.supervisor; +import static java.util.Objects.requireNonNull; + import akka.actor.typed.ActorRef; import akka.actor.typed.Behavior; import akka.actor.typed.javadsl.AbstractBehavior; @@ -20,15 +22,24 @@ import akka.cluster.ddata.ORSet; import akka.cluster.ddata.typed.javadsl.DistributedData; import akka.cluster.ddata.typed.javadsl.Replicator; import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter; +import akka.pattern.StatusReply; import java.time.Duration; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterActivated; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest; import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialCandidateSync; import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialOwnerSync; import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply; +import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorRequest; import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry; +import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec; import org.opendaylight.mdsal.eos.dom.api.DOMEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,25 +57,34 @@ public final class OwnerSyncer extends AbstractBehavior // String representation of Entity to DOMEntity private final Map entityLookup = new HashMap<>(); + private final BindingInstanceIdentifierCodec iidCodec; private int toSync = -1; - private OwnerSyncer(final ActorContext context) { + private OwnerSyncer(final ActorContext context, + final @Nullable ActorRef notifyDatacenterStarted, + final BindingInstanceIdentifierCodec iidCodec) { super(context); + this.iidCodec = requireNonNull(iidCodec); LOG.debug("Starting candidate and owner sync"); final ActorRef replicator = DistributedData.get(context.getSystem()).replicator(); - this.ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5)); + ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5)); new ReplicatorMessageAdapter>>(context, replicator, Duration.ofSeconds(5)).askGet( askReplyTo -> new Replicator.Get<>(CandidateRegistry.KEY, Replicator.readLocal(), askReplyTo), InitialCandidateSync::new); + + if (notifyDatacenterStarted != null) { + notifyDatacenterStarted.tell(DataCenterActivated.INSTANCE); + } } - public static Behavior create() { - return Behaviors.setup(OwnerSyncer::new); + public static Behavior create(final ActorRef notifyDatacenterStarted, + final BindingInstanceIdentifierCodec iidCodec) { + return Behaviors.setup(ctx -> new OwnerSyncer(ctx, notifyDatacenterStarted, iidCodec)); } @Override @@ -72,9 +92,19 @@ public final class OwnerSyncer extends AbstractBehavior return newReceiveBuilder() .onMessage(InitialCandidateSync.class, this::onInitialCandidateSync) .onMessage(InitialOwnerSync.class, this::onInitialOwnerSync) + .onMessage(GetEntitiesBackendRequest.class, this::onFailEntityRpc) + .onMessage(GetEntityBackendRequest.class, this::onFailEntityRpc) + .onMessage(GetEntityOwnerBackendRequest.class, this::onFailEntityRpc) .build(); } + private Behavior onFailEntityRpc(final OwnerSupervisorRequest message) { + LOG.debug("Failing rpc request. {}", message); + message.getReplyTo().tell(StatusReply.error( + "OwnerSupervisor is inactive so it cannot handle entity rpc requests.")); + return this; + } + private Behavior onInitialCandidateSync(final InitialCandidateSync rsp) { final Replicator.GetResponse>> response = rsp.getResponse(); if (response instanceof Replicator.GetSuccess) { @@ -133,8 +163,7 @@ public final class OwnerSyncer extends AbstractBehavior private Behavior switchToSupervisor() { LOG.debug("Initial sync done, switching to supervisor. candidates: {}, owners: {}", currentCandidates, currentOwners); - return Behaviors.setup(ctx -> - OwnerSupervisor.create(currentCandidates, currentOwners)); + return Behaviors.setup(ctx -> OwnerSupervisor.create(currentCandidates, currentOwners, iidCodec)); } private void handleOwnerRsp(final Replicator.GetSuccess> rsp) {