2 * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.eos.akka.owner.supervisor;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.typed.ActorRef;
13 import akka.actor.typed.Behavior;
14 import akka.actor.typed.javadsl.AbstractBehavior;
15 import akka.actor.typed.javadsl.ActorContext;
16 import akka.actor.typed.javadsl.Behaviors;
17 import akka.actor.typed.javadsl.Receive;
18 import akka.cluster.ddata.LWWRegister;
19 import akka.cluster.ddata.LWWRegisterKey;
20 import akka.cluster.ddata.ORMap;
21 import akka.cluster.ddata.ORSet;
22 import akka.cluster.ddata.typed.javadsl.DistributedData;
23 import akka.cluster.ddata.typed.javadsl.Replicator;
24 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
25 import akka.pattern.StatusReply;
26 import java.time.Duration;
27 import java.util.HashMap;
28 import java.util.HashSet;
31 import org.eclipse.jdt.annotation.Nullable;
32 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterActivated;
33 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest;
34 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest;
35 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest;
36 import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialCandidateSync;
37 import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialOwnerSync;
38 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
39 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
40 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorRequest;
41 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
42 import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
43 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
48 * Behavior that retrieves current candidates/owners from distributed-data and switches to OwnerSupervisor when the
51 public final class OwnerSyncer extends AbstractBehavior<OwnerSupervisorCommand> {
52 private static final Logger LOG = LoggerFactory.getLogger(OwnerSyncer.class);
54 private final ReplicatorMessageAdapter<OwnerSupervisorCommand, LWWRegister<String>> ownerReplicator;
55 private final Map<DOMEntity, Set<String>> currentCandidates = new HashMap<>();
56 private final Map<DOMEntity, String> currentOwners = new HashMap<>();
58 // String representation of Entity to DOMEntity
59 private final Map<String, DOMEntity> entityLookup = new HashMap<>();
60 private final BindingInstanceIdentifierCodec iidCodec;
62 private int toSync = -1;
64 private OwnerSyncer(final ActorContext<OwnerSupervisorCommand> context,
65 final @Nullable ActorRef<OwnerSupervisorReply> notifyDatacenterStarted,
66 final BindingInstanceIdentifierCodec iidCodec) {
68 this.iidCodec = requireNonNull(iidCodec);
69 LOG.debug("Starting candidate and owner sync");
71 final ActorRef<Replicator.Command> replicator = DistributedData.get(context.getSystem()).replicator();
73 ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
75 new ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>>(context, replicator,
76 Duration.ofSeconds(5)).askGet(
77 askReplyTo -> new Replicator.Get<>(CandidateRegistry.KEY, Replicator.readLocal(), askReplyTo),
78 InitialCandidateSync::new);
80 if (notifyDatacenterStarted != null) {
81 notifyDatacenterStarted.tell(DataCenterActivated.INSTANCE);
85 public static Behavior<OwnerSupervisorCommand> create(final ActorRef<OwnerSupervisorReply> notifyDatacenterStarted,
86 final BindingInstanceIdentifierCodec iidCodec) {
87 return Behaviors.setup(ctx -> new OwnerSyncer(ctx, notifyDatacenterStarted, iidCodec));
91 public Receive<OwnerSupervisorCommand> createReceive() {
92 return newReceiveBuilder()
93 .onMessage(InitialCandidateSync.class, this::onInitialCandidateSync)
94 .onMessage(InitialOwnerSync.class, this::onInitialOwnerSync)
95 .onMessage(GetEntitiesBackendRequest.class, this::onFailEntityRpc)
96 .onMessage(GetEntityBackendRequest.class, this::onFailEntityRpc)
97 .onMessage(GetEntityOwnerBackendRequest.class, this::onFailEntityRpc)
101 private Behavior<OwnerSupervisorCommand> onFailEntityRpc(final OwnerSupervisorRequest message) {
102 LOG.debug("Failing rpc request. {}", message);
103 message.getReplyTo().tell(StatusReply.error(
104 "OwnerSupervisor is inactive so it cannot handle entity rpc requests."));
108 private Behavior<OwnerSupervisorCommand> onInitialCandidateSync(final InitialCandidateSync rsp) {
109 final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response = rsp.getResponse();
110 if (response instanceof Replicator.GetSuccess) {
111 return doInitialSync((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response);
112 } else if (response instanceof Replicator.NotFound) {
113 LOG.debug("No candidates found switching to supervisor");
114 return switchToSupervisor();
116 LOG.debug("Initial candidate sync failed, switching to supervisor. Sync reply: {}", response);
117 return switchToSupervisor();
121 private Behavior<OwnerSupervisorCommand> doInitialSync(
122 final Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>> response) {
124 final ORMap<DOMEntity, ORSet<String>> candidates = response.get(CandidateRegistry.KEY);
125 candidates.getEntries().entrySet().forEach(entry -> {
126 currentCandidates.put(entry.getKey(), new HashSet<>(entry.getValue().getElements()));
129 toSync = candidates.keys().size();
130 for (final DOMEntity entity : candidates.keys().getElements()) {
131 entityLookup.put(entity.toString(), entity);
133 ownerReplicator.askGet(
134 askReplyTo -> new Replicator.Get<>(
135 new LWWRegisterKey<>(entity.toString()),
136 Replicator.readLocal(),
138 InitialOwnerSync::new);
144 private Behavior<OwnerSupervisorCommand> onInitialOwnerSync(final InitialOwnerSync rsp) {
145 final Replicator.GetResponse<LWWRegister<String>> response = rsp.getResponse();
146 if (response instanceof Replicator.GetSuccess) {
147 handleOwnerRsp((Replicator.GetSuccess<LWWRegister<String>>) response);
148 } else if (response instanceof Replicator.NotFound) {
149 handleNotFoundOwnerRsp((Replicator.NotFound<LWWRegister<String>>) response);
151 LOG.debug("Initial sync failed response: {}", response);
154 // count the responses, on last switch behaviors
157 return switchToSupervisor();
163 private Behavior<OwnerSupervisorCommand> switchToSupervisor() {
164 LOG.debug("Initial sync done, switching to supervisor. candidates: {}, owners: {}",
165 currentCandidates, currentOwners);
166 return Behaviors.setup(ctx -> OwnerSupervisor.create(currentCandidates, currentOwners, iidCodec));
169 private void handleOwnerRsp(final Replicator.GetSuccess<LWWRegister<String>> rsp) {
170 final DOMEntity entity = entityLookup.get(rsp.key().id());
171 final String owner = rsp.get(rsp.key()).getValue();
173 currentOwners.put(entity, owner);
176 private static void handleNotFoundOwnerRsp(final Replicator.NotFound<LWWRegister<String>> rsp) {
177 LOG.debug("Owner not found. {}", rsp);