*/
package org.opendaylight.controller.eos.akka.owner.supervisor;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
-import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import akka.cluster.ddata.typed.javadsl.DistributedData;
import akka.cluster.ddata.typed.javadsl.Replicator;
import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
+import akka.pattern.StatusReply;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidates;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesForMember;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterActivated;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialCandidateSync;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialOwnerSync;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorRequest;
import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
+import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Behavior that retrieves current candidates/owners from distributed-data and switches to OwnerSupervisor when the
* sync has finished.
*/
-public final class OwnerSyncer extends AbstractBehavior<OwnerSupervisorCommand> {
+public final class OwnerSyncer extends AbstractSupervisor {
private static final Logger LOG = LoggerFactory.getLogger(OwnerSyncer.class);
private final ReplicatorMessageAdapter<OwnerSupervisorCommand, LWWRegister<String>> ownerReplicator;
// String representation of Entity to DOMEntity
private final Map<String, DOMEntity> entityLookup = new HashMap<>();
+ private final BindingInstanceIdentifierCodec iidCodec;
private int toSync = -1;
private OwnerSyncer(final ActorContext<OwnerSupervisorCommand> context,
- @Nullable final ActorRef<OwnerSupervisorReply> notifyDatacenterStarted) {
+ final @Nullable ActorRef<OwnerSupervisorReply> notifyDatacenterStarted,
+ final BindingInstanceIdentifierCodec iidCodec) {
super(context);
+ this.iidCodec = requireNonNull(iidCodec);
LOG.debug("Starting candidate and owner sync");
final ActorRef<Replicator.Command> replicator = DistributedData.get(context.getSystem()).replicator();
- this.ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
+ ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
- new ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>>(context, replicator,
- Duration.ofSeconds(5)).askGet(
+ candidateReplicator.askGet(
askReplyTo -> new Replicator.Get<>(CandidateRegistry.KEY, Replicator.readLocal(), askReplyTo),
InitialCandidateSync::new);
}
}
- public static Behavior<OwnerSupervisorCommand> create(
- final ActorRef<OwnerSupervisorReply> notifyDatacenterStarted) {
- return Behaviors.setup(ctx -> new OwnerSyncer(ctx, notifyDatacenterStarted));
+ public static Behavior<OwnerSupervisorCommand> create(final ActorRef<OwnerSupervisorReply> notifyDatacenterStarted,
+ final BindingInstanceIdentifierCodec iidCodec) {
+ return Behaviors.setup(ctx -> new OwnerSyncer(ctx, notifyDatacenterStarted, iidCodec));
}
@Override
return newReceiveBuilder()
.onMessage(InitialCandidateSync.class, this::onInitialCandidateSync)
.onMessage(InitialOwnerSync.class, this::onInitialOwnerSync)
+ .onMessage(GetEntitiesBackendRequest.class, this::onFailEntityRpc)
+ .onMessage(GetEntityBackendRequest.class, this::onFailEntityRpc)
+ .onMessage(GetEntityOwnerBackendRequest.class, this::onFailEntityRpc)
+ .onMessage(ClearCandidatesForMember.class, this::onClearCandidatesForMember)
+ .onMessage(ClearCandidates.class, this::finishClearCandidates)
.build();
}
+ private Behavior<OwnerSupervisorCommand> onFailEntityRpc(final OwnerSupervisorRequest message) {
+ LOG.debug("Failing rpc request. {}", message);
+ message.getReplyTo().tell(StatusReply.error(
+ "OwnerSupervisor is inactive so it cannot handle entity rpc requests."));
+ return this;
+ }
+
private Behavior<OwnerSupervisorCommand> onInitialCandidateSync(final InitialCandidateSync rsp) {
final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response = rsp.getResponse();
if (response instanceof Replicator.GetSuccess) {
private Behavior<OwnerSupervisorCommand> switchToSupervisor() {
LOG.debug("Initial sync done, switching to supervisor. candidates: {}, owners: {}",
currentCandidates, currentOwners);
- return Behaviors.setup(ctx ->
- OwnerSupervisor.create(currentCandidates, currentOwners));
+ return Behaviors.setup(ctx -> OwnerSupervisor.create(currentCandidates, currentOwners, iidCodec));
}
private void handleOwnerRsp(final Replicator.GetSuccess<LWWRegister<String>> rsp) {
private static void handleNotFoundOwnerRsp(final Replicator.NotFound<LWWRegister<String>> rsp) {
LOG.debug("Owner not found. {}", rsp);
}
+
+ @Override
+ Logger getLogger() {
+ return LOG;
+ }
}