Do not implement concepts.Builder
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / RootDataTreeChangeListenerProxy.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Verify.verify;
11 import static com.google.common.base.Verify.verifyNotNull;
12 import static java.util.Objects.requireNonNull;
13
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSelection;
16 import akka.actor.PoisonPill;
17 import akka.dispatch.OnComplete;
18 import com.google.common.collect.Maps;
19 import java.util.ArrayList;
20 import java.util.HashMap;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Map.Entry;
24 import java.util.Set;
25 import org.checkerframework.checker.lock.qual.GuardedBy;
26 import org.checkerframework.checker.lock.qual.Holding;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
29 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
30 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
31 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
33 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
34 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 final class RootDataTreeChangeListenerProxy<L extends DOMDataTreeChangeListener>
39         extends AbstractListenerRegistration<L> {
40     private abstract static class State {
41
42     }
43
44     private static final class ResolveShards extends State {
45         final Map<String, Object> localShards = new HashMap<>();
46         final int shardCount;
47
48         ResolveShards(final int shardCount) {
49             this.shardCount = shardCount;
50         }
51     }
52
53     private static final class Subscribed extends State {
54         final List<ActorSelection> subscriptions;
55         final ActorRef dtclActor;
56
57         Subscribed(final ActorRef dtclActor, final int shardCount) {
58             this.dtclActor = requireNonNull(dtclActor);
59             subscriptions = new ArrayList<>(shardCount);
60         }
61     }
62
63     private static final class Terminated extends State {
64
65     }
66
67     private static final Logger LOG = LoggerFactory.getLogger(RootDataTreeChangeListenerProxy.class);
68
69     private final ActorUtils actorUtils;
70
71     @GuardedBy("this")
72     private State state;
73
74     RootDataTreeChangeListenerProxy(final ActorUtils actorUtils, final @NonNull L listener,
75             final Set<String> shardNames) {
76         super(listener);
77         this.actorUtils = requireNonNull(actorUtils);
78         this.state = new ResolveShards(shardNames.size());
79
80         for (String shardName : shardNames) {
81             actorUtils.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
82                 @Override
83                 public void onComplete(final Throwable failure, final ActorRef success) {
84                     onFindLocalShardComplete(shardName, failure, success);
85                 }
86             }, actorUtils.getClientDispatcher());
87         }
88     }
89
90     @Override
91     protected synchronized void removeRegistration() {
92         if (state instanceof Terminated) {
93             // Trivial case: we have already terminated on a failure, so this is a no-op
94         } else if (state instanceof ResolveShards) {
95             // Simple case: just mark the fact we were closed, terminating when resolution finishes
96             state = new Terminated();
97         } else if (state instanceof Subscribed) {
98             terminate((Subscribed) state);
99         } else {
100             throw new IllegalStateException("Unhandled close in state " + state);
101         }
102     }
103
104     private synchronized void onFindLocalShardComplete(final String shardName, final Throwable failure,
105             final ActorRef shard) {
106         if (state instanceof ResolveShards) {
107             localShardsResolved((ResolveShards) state, shardName, failure, shard);
108         } else {
109             LOG.debug("{}: lookup for shard {} turned into a noop on state {}", logContext(), shardName, state);
110         }
111     }
112
113     @Holding("this")
114     private void localShardsResolved(final ResolveShards current, final String shardName, final Throwable failure,
115             final ActorRef shard) {
116         final Object result = failure != null ? failure : verifyNotNull(shard);
117         LOG.debug("{}: lookup for shard {} resulted in {}", logContext(), shardName, result);
118         current.localShards.put(shardName, result);
119
120         if (current.localShards.size() == current.shardCount) {
121             // We have all the responses we need
122             if (current.localShards.values().stream().anyMatch(Throwable.class::isInstance)) {
123                 reportFailure(current.localShards);
124             } else {
125                 subscribeToShards(current.localShards);
126             }
127         }
128     }
129
130     @Holding("this")
131     private void reportFailure(final Map<String, Object> localShards) {
132         for (Entry<String, Object> entry : Maps.filterValues(localShards, Throwable.class::isInstance).entrySet()) {
133             final Throwable cause = (Throwable) entry.getValue();
134             LOG.error("{}: Failed to find local shard {}, cannot register {} at root", logContext(), entry.getKey(),
135                 getInstance(), cause);
136         }
137         state = new Terminated();
138     }
139
140     @Holding("this")
141     private void subscribeToShards(final Map<String, Object> localShards) {
142         // Safety check before we start doing anything
143         for (Entry<String, Object> entry : localShards.entrySet()) {
144             final Object obj = entry.getValue();
145             verify(obj instanceof ActorRef, "Unhandled response %s for shard %s", obj, entry.getKey());
146         }
147
148         // Instantiate the DTCL actor and update state
149         final ActorRef dtclActor = actorUtils.getActorSystem().actorOf(
150             RootDataTreeChangeListenerActor.props(getInstance(), localShards.size())
151               .withDispatcher(actorUtils.getNotificationDispatcherPath()));
152         state = new Subscribed(dtclActor, localShards.size());
153
154         // Subscribe to all shards
155         final RegisterDataTreeChangeListener regMessage = new RegisterDataTreeChangeListener(
156             YangInstanceIdentifier.empty(), dtclActor, true);
157         for (Entry<String, Object> entry : localShards.entrySet()) {
158             // Do not retain references to localShards
159             final String shardName = entry.getKey();
160             final ActorRef shard = (ActorRef) entry.getValue();
161
162             actorUtils.executeOperationAsync(shard, regMessage,
163                 actorUtils.getDatastoreContext().getShardInitializationTimeout()).onComplete(new OnComplete<>() {
164                     @Override
165                     public void onComplete(final Throwable failure, final Object result) {
166                         onShardSubscribed(shardName, failure, result);
167                     }
168                 }, actorUtils.getClientDispatcher());
169         }
170     }
171
172     private synchronized void onShardSubscribed(final String shardName, final Throwable failure, final Object result) {
173         if (state instanceof Subscribed) {
174             final Subscribed current = (Subscribed) state;
175             if (failure != null) {
176                 LOG.error("{}: Shard {} failed to subscribe, terminating listener {}", logContext(),
177                     shardName,getInstance(), failure);
178                 terminate(current);
179             } else {
180                 onSuccessfulSubscription(current, shardName, (RegisterDataTreeNotificationListenerReply) result);
181             }
182         } else {
183             terminateSubscription(shardName, failure, result);
184         }
185     }
186
187     @Holding("this")
188     private void onSuccessfulSubscription(final Subscribed current, final String shardName,
189             final RegisterDataTreeNotificationListenerReply reply) {
190         final ActorSelection regActor = actorUtils.actorSelection(reply.getListenerRegistrationPath());
191         LOG.debug("{}: Shard {} subscribed at {}", logContext(), shardName, regActor);
192         current.subscriptions.add(regActor);
193     }
194
195     @Holding("this")
196     private void terminate(final Subscribed current) {
197         // Terminate the listener
198         current.dtclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
199         // Terminate all subscriptions
200         for (ActorSelection regActor : current.subscriptions) {
201             regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), ActorRef.noSender());
202         }
203         state = new Terminated();
204     }
205
206     // This method should not modify internal state
207     private void terminateSubscription(final String shardName, final Throwable failure, final Object result) {
208         if (failure == null) {
209             final ActorSelection regActor = actorUtils.actorSelection(
210                 ((RegisterDataTreeNotificationListenerReply) result).getListenerRegistrationPath());
211             LOG.debug("{}: Shard {} registered late, terminating subscription at {}", logContext(), shardName,
212                 regActor);
213             regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), ActorRef.noSender());
214         } else {
215             LOG.debug("{}: Shard {} reported late failure", logContext(), shardName, failure);
216         }
217     }
218
219     private String logContext() {
220         return actorUtils.getDatastoreContext().getLogicalStoreType().toString();
221     }
222 }