Move {Identifiable,Persistent,}Payload
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / owner / supervisor / OwnerSyncer.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka.owner.supervisor;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.typed.ActorRef;
13 import akka.actor.typed.Behavior;
14 import akka.actor.typed.javadsl.ActorContext;
15 import akka.actor.typed.javadsl.Behaviors;
16 import akka.actor.typed.javadsl.Receive;
17 import akka.cluster.ddata.LWWRegister;
18 import akka.cluster.ddata.LWWRegisterKey;
19 import akka.cluster.ddata.ORMap;
20 import akka.cluster.ddata.ORSet;
21 import akka.cluster.ddata.typed.javadsl.DistributedData;
22 import akka.cluster.ddata.typed.javadsl.Replicator;
23 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
24 import akka.pattern.StatusReply;
25 import java.time.Duration;
26 import java.util.HashMap;
27 import java.util.HashSet;
28 import java.util.Map;
29 import java.util.Set;
30 import org.eclipse.jdt.annotation.Nullable;
31 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidates;
32 import org.opendaylight.controller.eos.akka.owner.supervisor.command.ClearCandidatesForMember;
33 import org.opendaylight.controller.eos.akka.owner.supervisor.command.DataCenterActivated;
34 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntitiesBackendRequest;
35 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityBackendRequest;
36 import org.opendaylight.controller.eos.akka.owner.supervisor.command.GetEntityOwnerBackendRequest;
37 import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialCandidateSync;
38 import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialOwnerSync;
39 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
40 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
41 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorRequest;
42 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
43 import org.opendaylight.mdsal.binding.dom.codec.api.BindingInstanceIdentifierCodec;
44 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 /**
49  * Behavior that retrieves current candidates/owners from distributed-data and switches to OwnerSupervisor when the
50  * sync has finished.
51  */
52 public final class OwnerSyncer extends AbstractSupervisor {
53     private static final Logger LOG = LoggerFactory.getLogger(OwnerSyncer.class);
54
55     private final ReplicatorMessageAdapter<OwnerSupervisorCommand, LWWRegister<String>> ownerReplicator;
56     private final Map<DOMEntity, Set<String>> currentCandidates = new HashMap<>();
57     private final Map<DOMEntity, String> currentOwners = new HashMap<>();
58
59     // String representation of Entity to DOMEntity
60     private final Map<String, DOMEntity> entityLookup = new HashMap<>();
61     private final BindingInstanceIdentifierCodec iidCodec;
62
63     private int toSync = -1;
64
65     private OwnerSyncer(final ActorContext<OwnerSupervisorCommand> context,
66                         final @Nullable ActorRef<OwnerSupervisorReply> notifyDatacenterStarted,
67                         final BindingInstanceIdentifierCodec iidCodec) {
68         super(context);
69         this.iidCodec = requireNonNull(iidCodec);
70         LOG.debug("Starting candidate and owner sync");
71
72         final ActorRef<Replicator.Command> replicator = DistributedData.get(context.getSystem()).replicator();
73
74         ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
75
76         candidateReplicator.askGet(
77                 askReplyTo -> new Replicator.Get<>(CandidateRegistry.KEY, Replicator.readLocal(), askReplyTo),
78                 InitialCandidateSync::new);
79
80         if (notifyDatacenterStarted != null) {
81             notifyDatacenterStarted.tell(DataCenterActivated.INSTANCE);
82         }
83     }
84
85     public static Behavior<OwnerSupervisorCommand> create(final ActorRef<OwnerSupervisorReply> notifyDatacenterStarted,
86             final BindingInstanceIdentifierCodec iidCodec) {
87         return Behaviors.setup(ctx -> new OwnerSyncer(ctx, notifyDatacenterStarted, iidCodec));
88     }
89
90     @Override
91     public Receive<OwnerSupervisorCommand> createReceive() {
92         return newReceiveBuilder()
93                 .onMessage(InitialCandidateSync.class, this::onInitialCandidateSync)
94                 .onMessage(InitialOwnerSync.class, this::onInitialOwnerSync)
95                 .onMessage(GetEntitiesBackendRequest.class, this::onFailEntityRpc)
96                 .onMessage(GetEntityBackendRequest.class, this::onFailEntityRpc)
97                 .onMessage(GetEntityOwnerBackendRequest.class, this::onFailEntityRpc)
98                 .onMessage(ClearCandidatesForMember.class, this::onClearCandidatesForMember)
99                 .onMessage(ClearCandidates.class, this::finishClearCandidates)
100                 .build();
101     }
102
103     private Behavior<OwnerSupervisorCommand> onFailEntityRpc(final OwnerSupervisorRequest message) {
104         LOG.debug("Failing rpc request. {}", message);
105         message.getReplyTo().tell(StatusReply.error(
106             "OwnerSupervisor is inactive so it cannot handle entity rpc requests."));
107         return this;
108     }
109
110     private Behavior<OwnerSupervisorCommand> onInitialCandidateSync(final InitialCandidateSync rsp) {
111         final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response = rsp.getResponse();
112         if (response instanceof Replicator.GetSuccess) {
113             return doInitialSync((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response);
114         } else if (response instanceof Replicator.NotFound) {
115             LOG.debug("No candidates found switching to supervisor");
116             return switchToSupervisor();
117         } else {
118             LOG.debug("Initial candidate sync failed, switching to supervisor. Sync reply: {}", response);
119             return switchToSupervisor();
120         }
121     }
122
123     private Behavior<OwnerSupervisorCommand> doInitialSync(
124             final Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>> response) {
125
126         final ORMap<DOMEntity, ORSet<String>> candidates = response.get(CandidateRegistry.KEY);
127         candidates.getEntries().entrySet().forEach(entry -> {
128             currentCandidates.put(entry.getKey(), new HashSet<>(entry.getValue().getElements()));
129         });
130
131         toSync = candidates.keys().size();
132         for (final DOMEntity entity : candidates.keys().getElements()) {
133             entityLookup.put(entity.toString(), entity);
134
135             ownerReplicator.askGet(
136                     askReplyTo -> new Replicator.Get<>(
137                             new LWWRegisterKey<>(entity.toString()),
138                             Replicator.readLocal(),
139                             askReplyTo),
140                     InitialOwnerSync::new);
141         }
142
143         return this;
144     }
145
146     private Behavior<OwnerSupervisorCommand> onInitialOwnerSync(final InitialOwnerSync rsp) {
147         final Replicator.GetResponse<LWWRegister<String>> response = rsp.getResponse();
148         if (response instanceof Replicator.GetSuccess) {
149             handleOwnerRsp((Replicator.GetSuccess<LWWRegister<String>>) response);
150         } else if (response instanceof Replicator.NotFound) {
151             handleNotFoundOwnerRsp((Replicator.NotFound<LWWRegister<String>>) response);
152         } else {
153             LOG.debug("Initial sync failed response: {}", response);
154         }
155
156         // count the responses, on last switch behaviors
157         toSync--;
158         if (toSync == 0) {
159             return switchToSupervisor();
160         }
161
162         return this;
163     }
164
165     private Behavior<OwnerSupervisorCommand> switchToSupervisor() {
166         LOG.debug("Initial sync done, switching to supervisor. candidates: {}, owners: {}",
167                 currentCandidates, currentOwners);
168         return Behaviors.setup(ctx -> OwnerSupervisor.create(currentCandidates, currentOwners, iidCodec));
169     }
170
171     private void handleOwnerRsp(final Replicator.GetSuccess<LWWRegister<String>> rsp) {
172         final DOMEntity entity = entityLookup.get(rsp.key().id());
173         final String owner = rsp.get(rsp.key()).getValue();
174
175         currentOwners.put(entity, owner);
176     }
177
178     private static void handleNotFoundOwnerRsp(final Replicator.NotFound<LWWRegister<String>> rsp) {
179         LOG.debug("Owner not found. {}", rsp);
180     }
181
182     @Override
183     Logger getLogger() {
184         return LOG;
185     }
186 }