Convert DatastoreSnapshotRestore to OSGi DS
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / RootDataTreeChangeListenerProxy.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Verify.verify;
11 import static com.google.common.base.Verify.verifyNotNull;
12 import static java.util.Objects.requireNonNull;
13
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSelection;
16 import akka.actor.PoisonPill;
17 import akka.dispatch.OnComplete;
18 import com.google.common.collect.Maps;
19 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
20 import java.util.ArrayList;
21 import java.util.HashMap;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Map.Entry;
25 import java.util.Set;
26 import org.checkerframework.checker.lock.qual.GuardedBy;
27 import org.checkerframework.checker.lock.qual.Holding;
28 import org.eclipse.jdt.annotation.NonNull;
29 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
30 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
31 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
32 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
33 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
34 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
35 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 final class RootDataTreeChangeListenerProxy<L extends DOMDataTreeChangeListener>
40         extends AbstractListenerRegistration<L> {
41     private abstract static class State {
42
43     }
44
45     private static final class ResolveShards extends State {
46         final Map<String, Object> localShards = new HashMap<>();
47         final int shardCount;
48
49         ResolveShards(final int shardCount) {
50             this.shardCount = shardCount;
51         }
52     }
53
54     private static final class Subscribed extends State {
55         final List<ActorSelection> subscriptions;
56         final ActorRef dtclActor;
57
58         Subscribed(final ActorRef dtclActor, final int shardCount) {
59             this.dtclActor = requireNonNull(dtclActor);
60             subscriptions = new ArrayList<>(shardCount);
61         }
62     }
63
64     private static final class Terminated extends State {
65
66     }
67
68     private static final Logger LOG = LoggerFactory.getLogger(RootDataTreeChangeListenerProxy.class);
69
70     private final ActorUtils actorUtils;
71
72     @GuardedBy("this")
73     private State state;
74
75     RootDataTreeChangeListenerProxy(final ActorUtils actorUtils, final @NonNull L listener,
76             final Set<String> shardNames) {
77         super(listener);
78         this.actorUtils = requireNonNull(actorUtils);
79         this.state = new ResolveShards(shardNames.size());
80
81         for (String shardName : shardNames) {
82             actorUtils.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
83                 @Override
84                 public void onComplete(final Throwable failure, final ActorRef success) {
85                     onFindLocalShardComplete(shardName, failure, success);
86                 }
87             }, actorUtils.getClientDispatcher());
88         }
89     }
90
91     @Override
92     protected synchronized void removeRegistration() {
93         if (state instanceof Terminated) {
94             // Trivial case: we have already terminated on a failure, so this is a no-op
95         } else if (state instanceof ResolveShards) {
96             // Simple case: just mark the fact we were closed, terminating when resolution finishes
97             state = new Terminated();
98         } else if (state instanceof Subscribed) {
99             terminate((Subscribed) state);
100         } else {
101             throw new IllegalStateException("Unhandled close in state " + state);
102         }
103     }
104
105     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
106             justification = "https://github.com/spotbugs/spotbugs/issues/811")
107     private synchronized void onFindLocalShardComplete(final String shardName, final Throwable failure,
108             final ActorRef shard) {
109         if (state instanceof ResolveShards) {
110             localShardsResolved((ResolveShards) state, shardName, failure, shard);
111         } else {
112             LOG.debug("{}: lookup for shard {} turned into a noop on state {}", logContext(), shardName, state);
113         }
114     }
115
116     @Holding("this")
117     private void localShardsResolved(final ResolveShards current, final String shardName, final Throwable failure,
118             final ActorRef shard) {
119         final Object result = failure != null ? failure : verifyNotNull(shard);
120         LOG.debug("{}: lookup for shard {} resulted in {}", logContext(), shardName, result);
121         current.localShards.put(shardName, result);
122
123         if (current.localShards.size() == current.shardCount) {
124             // We have all the responses we need
125             if (current.localShards.values().stream().anyMatch(Throwable.class::isInstance)) {
126                 reportFailure(current.localShards);
127             } else {
128                 subscribeToShards(current.localShards);
129             }
130         }
131     }
132
133     @Holding("this")
134     private void reportFailure(final Map<String, Object> localShards) {
135         for (Entry<String, Object> entry : Maps.filterValues(localShards, Throwable.class::isInstance).entrySet()) {
136             final Throwable cause = (Throwable) entry.getValue();
137             LOG.error("{}: Failed to find local shard {}, cannot register {} at root", logContext(), entry.getKey(),
138                 getInstance(), cause);
139         }
140         state = new Terminated();
141     }
142
143     @Holding("this")
144     private void subscribeToShards(final Map<String, Object> localShards) {
145         // Safety check before we start doing anything
146         for (Entry<String, Object> entry : localShards.entrySet()) {
147             final Object obj = entry.getValue();
148             verify(obj instanceof ActorRef, "Unhandled response %s for shard %s", obj, entry.getKey());
149         }
150
151         // Instantiate the DTCL actor and update state
152         final ActorRef dtclActor = actorUtils.getActorSystem().actorOf(
153             RootDataTreeChangeListenerActor.props(getInstance(), localShards.size())
154               .withDispatcher(actorUtils.getNotificationDispatcherPath()));
155         state = new Subscribed(dtclActor, localShards.size());
156
157         // Subscribe to all shards
158         final RegisterDataTreeChangeListener regMessage = new RegisterDataTreeChangeListener(
159             YangInstanceIdentifier.empty(), dtclActor, true);
160         for (Entry<String, Object> entry : localShards.entrySet()) {
161             // Do not retain references to localShards
162             final String shardName = entry.getKey();
163             final ActorRef shard = (ActorRef) entry.getValue();
164
165             actorUtils.executeOperationAsync(shard, regMessage,
166                 actorUtils.getDatastoreContext().getShardInitializationTimeout()).onComplete(new OnComplete<>() {
167                     @Override
168                     public void onComplete(final Throwable failure, final Object result) {
169                         onShardSubscribed(shardName, failure, result);
170                     }
171                 }, actorUtils.getClientDispatcher());
172         }
173     }
174
175     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
176             justification = "https://github.com/spotbugs/spotbugs/issues/811")
177     private synchronized void onShardSubscribed(final String shardName, final Throwable failure, final Object result) {
178         if (state instanceof Subscribed) {
179             final Subscribed current = (Subscribed) state;
180             if (failure != null) {
181                 LOG.error("{}: Shard {} failed to subscribe, terminating listener {}", logContext(),
182                     shardName,getInstance(), failure);
183                 terminate(current);
184             } else {
185                 onSuccessfulSubscription(current, shardName, (RegisterDataTreeNotificationListenerReply) result);
186             }
187         } else {
188             terminateSubscription(shardName, failure, result);
189         }
190     }
191
192     @Holding("this")
193     private void onSuccessfulSubscription(final Subscribed current, final String shardName,
194             final RegisterDataTreeNotificationListenerReply reply) {
195         final ActorSelection regActor = actorUtils.actorSelection(reply.getListenerRegistrationPath());
196         LOG.debug("{}: Shard {} subscribed at {}", logContext(), shardName, regActor);
197         current.subscriptions.add(regActor);
198     }
199
200     @Holding("this")
201     private void terminate(final Subscribed current) {
202         // Terminate the listener
203         current.dtclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
204         // Terminate all subscriptions
205         for (ActorSelection regActor : current.subscriptions) {
206             regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), ActorRef.noSender());
207         }
208         state = new Terminated();
209     }
210
211     // This method should not modify internal state
212     private void terminateSubscription(final String shardName, final Throwable failure, final Object result) {
213         if (failure == null) {
214             final ActorSelection regActor = actorUtils.actorSelection(
215                 ((RegisterDataTreeNotificationListenerReply) result).getListenerRegistrationPath());
216             LOG.debug("{}: Shard {} registered late, terminating subscription at {}", logContext(), shardName,
217                 regActor);
218             regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), ActorRef.noSender());
219         } else {
220             LOG.debug("{}: Shard {} reported late failure", logContext(), shardName, failure);
221         }
222     }
223
224     private String logContext() {
225         return actorUtils.getDatastoreContext().getLogicalStoreType().toString();
226     }
227 }