Introduce DOMEntityOwnershipService replacement
[controller.git] / opendaylight / md-sal / eos-dom-akka / src / main / java / org / opendaylight / controller / eos / akka / owner / supervisor / OwnerSyncer.java
1 /*
2  * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.eos.akka.owner.supervisor;
9
10 import akka.actor.typed.ActorRef;
11 import akka.actor.typed.Behavior;
12 import akka.actor.typed.javadsl.AbstractBehavior;
13 import akka.actor.typed.javadsl.ActorContext;
14 import akka.actor.typed.javadsl.Behaviors;
15 import akka.actor.typed.javadsl.Receive;
16 import akka.cluster.ddata.LWWRegister;
17 import akka.cluster.ddata.LWWRegisterKey;
18 import akka.cluster.ddata.ORMap;
19 import akka.cluster.ddata.ORSet;
20 import akka.cluster.ddata.typed.javadsl.DistributedData;
21 import akka.cluster.ddata.typed.javadsl.Replicator;
22 import akka.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
23 import java.time.Duration;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.Map;
27 import java.util.Set;
28 import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialCandidateSync;
29 import org.opendaylight.controller.eos.akka.owner.supervisor.command.InitialOwnerSync;
30 import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
31 import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
32 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * Behavior that retrieves current candidates/owners from distributed-data and switches to OwnerSupervisor when the
38  * sync has finished.
39  */
40 public final class OwnerSyncer extends AbstractBehavior<OwnerSupervisorCommand> {
41     private static final Logger LOG = LoggerFactory.getLogger(OwnerSyncer.class);
42
43     private final ReplicatorMessageAdapter<OwnerSupervisorCommand, LWWRegister<String>> ownerReplicator;
44     private final Map<DOMEntity, Set<String>> currentCandidates = new HashMap<>();
45     private final Map<DOMEntity, String> currentOwners = new HashMap<>();
46
47     // String representation of Entity to DOMEntity
48     private final Map<String, DOMEntity> entityLookup = new HashMap<>();
49
50     private int toSync = -1;
51
52     private OwnerSyncer(final ActorContext<OwnerSupervisorCommand> context) {
53         super(context);
54         LOG.debug("Starting candidate and owner sync");
55
56         final ActorRef<Replicator.Command> replicator = DistributedData.get(context.getSystem()).replicator();
57
58         this.ownerReplicator = new ReplicatorMessageAdapter<>(context, replicator, Duration.ofSeconds(5));
59
60         new ReplicatorMessageAdapter<OwnerSupervisorCommand, ORMap<DOMEntity, ORSet<String>>>(context, replicator,
61             Duration.ofSeconds(5)).askGet(
62                 askReplyTo -> new Replicator.Get<>(CandidateRegistry.KEY, Replicator.readLocal(), askReplyTo),
63                 InitialCandidateSync::new);
64     }
65
66     public static Behavior<OwnerSupervisorCommand> create() {
67         return Behaviors.setup(OwnerSyncer::new);
68     }
69
70     @Override
71     public Receive<OwnerSupervisorCommand> createReceive() {
72         return newReceiveBuilder()
73                 .onMessage(InitialCandidateSync.class, this::onInitialCandidateSync)
74                 .onMessage(InitialOwnerSync.class, this::onInitialOwnerSync)
75                 .build();
76     }
77
78     private Behavior<OwnerSupervisorCommand> onInitialCandidateSync(final InitialCandidateSync rsp) {
79         final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response = rsp.getResponse();
80         if (response instanceof Replicator.GetSuccess) {
81             return doInitialSync((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response);
82         } else if (response instanceof Replicator.NotFound) {
83             LOG.debug("No candidates found switching to supervisor");
84             return switchToSupervisor();
85         } else {
86             LOG.debug("Initial candidate sync failed, switching to supervisor. Sync reply: {}", response);
87             return switchToSupervisor();
88         }
89     }
90
91     private Behavior<OwnerSupervisorCommand> doInitialSync(
92             final Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>> response) {
93
94         final ORMap<DOMEntity, ORSet<String>> candidates = response.get(CandidateRegistry.KEY);
95         candidates.getEntries().entrySet().forEach(entry -> {
96             currentCandidates.put(entry.getKey(), new HashSet<>(entry.getValue().getElements()));
97         });
98
99         toSync = candidates.keys().size();
100         for (final DOMEntity entity : candidates.keys().getElements()) {
101             entityLookup.put(entity.toString(), entity);
102
103             ownerReplicator.askGet(
104                     askReplyTo -> new Replicator.Get<>(
105                             new LWWRegisterKey<>(entity.toString()),
106                             Replicator.readLocal(),
107                             askReplyTo),
108                     InitialOwnerSync::new);
109         }
110
111         return this;
112     }
113
114     private Behavior<OwnerSupervisorCommand> onInitialOwnerSync(final InitialOwnerSync rsp) {
115         final Replicator.GetResponse<LWWRegister<String>> response = rsp.getResponse();
116         if (response instanceof Replicator.GetSuccess) {
117             handleOwnerRsp((Replicator.GetSuccess<LWWRegister<String>>) response);
118         } else if (response instanceof Replicator.NotFound) {
119             handleNotFoundOwnerRsp((Replicator.NotFound<LWWRegister<String>>) response);
120         } else {
121             LOG.debug("Initial sync failed response: {}", response);
122         }
123
124         // count the responses, on last switch behaviors
125         toSync--;
126         if (toSync == 0) {
127             return switchToSupervisor();
128         }
129
130         return this;
131     }
132
133     private Behavior<OwnerSupervisorCommand> switchToSupervisor() {
134         LOG.debug("Initial sync done, switching to supervisor. candidates: {}, owners: {}",
135                 currentCandidates, currentOwners);
136         return Behaviors.setup(ctx ->
137                 OwnerSupervisor.create(currentCandidates, currentOwners));
138     }
139
140     private void handleOwnerRsp(final Replicator.GetSuccess<LWWRegister<String>> rsp) {
141         final DOMEntity entity = entityLookup.get(rsp.key().id());
142         final String owner = rsp.get(rsp.key()).getValue();
143
144         currentOwners.put(entity, owner);
145     }
146
147     private static void handleNotFoundOwnerRsp(final Replicator.NotFound<LWWRegister<String>> rsp) {
148         LOG.debug("Owner not found. {}", rsp);
149     }
150 }