Do not generate 'isFoo()' methods
[mdsal.git] / dom / mdsal-dom-spi / src / main / java / org / opendaylight / mdsal / dom / spi / shard / DOMDataTreeListenerAggregator.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies, s.ro. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.spi.shard;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.Beta;
13 import com.google.common.base.Stopwatch;
14 import com.google.common.base.Verify;
15 import com.google.common.collect.ImmutableList;
16 import com.google.common.collect.ImmutableMap;
17 import com.google.common.collect.ImmutableMap.Builder;
18 import com.google.common.util.concurrent.ThreadFactoryBuilder;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Map.Entry;
25 import java.util.concurrent.Executor;
26 import java.util.concurrent.Executors;
27 import java.util.function.Function;
28 import org.checkerframework.checker.lock.qual.GuardedBy;
29 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
33 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
34 import org.opendaylight.yangtools.concepts.ListenerRegistration;
35 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
36 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 /**
41  * Aggregator which combines multiple disjunct {@link DOMDataTreeListener} and forwards their changes to a central
42  * listener.
43  *
44  * @author Robert Varga
45  *
46  * @deprecated This interface is scheduled for removal in the next major release.
47  */
48 @Deprecated(forRemoval = true)
49 @Beta
50 public final class DOMDataTreeListenerAggregator
51         extends AbstractStateAggregator<DOMDataTreeListenerAggregator.State> {
52
53     abstract static class State extends AbstractStateAggregator.State {
54
55     }
56
57     private static final class Aggregated extends State {
58         final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees;
59         final Collection<DOMDataTreeListeningException> failures;
60         final Collection<DataTreeCandidate> changes;
61
62         Aggregated(final Collection<DataTreeCandidate> changes,
63             final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees,
64             final Collection<DOMDataTreeListeningException> failures) {
65             this.changes = requireNonNull(changes);
66             this.subtrees = requireNonNull(subtrees);
67             this.failures = requireNonNull(failures);
68         }
69     }
70
71     private static final class Changes extends State {
72         final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees;
73         final Collection<DataTreeCandidate> changes;
74
75         Changes(final Collection<DataTreeCandidate> changes,
76             final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
77             this.changes = requireNonNull(changes);
78             this.subtrees = requireNonNull(subtrees);
79         }
80     }
81
82     private static final class Failure extends State {
83         final Collection<DOMDataTreeListeningException> causes;
84
85         Failure(final Collection<DOMDataTreeListeningException> causes) {
86             this.causes = requireNonNull(causes);
87         }
88     }
89
90     private static final class StateBuilder extends AbstractStateAggregator.StateBuilder<State> {
91         @GuardedBy("this")
92         private final Collection<DOMDataTreeListeningException> causes = new ArrayList<>(0);
93         @GuardedBy("this")
94         private final Collection<DataTreeCandidate> changes = new ArrayList<>();
95         @GuardedBy("this")
96         private Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = ImmutableMap.of();
97
98         @Override
99         protected synchronized void append(final State state) {
100             if (state instanceof Changes) {
101                 final Changes changesState = (Changes) state;
102                 this.changes.addAll(changesState.changes);
103                 subtrees = ImmutableMap.copyOf(changesState.subtrees);
104             } else if (state instanceof Failure) {
105                 causes.addAll(((Failure) state).causes);
106             } else {
107                 throw new IllegalStateException("Unexpected state " + state);
108             }
109         }
110
111         @Override
112         protected synchronized void appendInitial(final State state) {
113             // TODO: we could index and compress state changes here
114             if (state instanceof Changes) {
115                 final Changes changesState = (Changes) state;
116                 this.changes.addAll(changesState.changes);
117                 subtrees = ImmutableMap.copyOf(changesState.subtrees);
118             } else if (state instanceof Failure) {
119                 causes.addAll(((Failure) state).causes);
120             } else {
121                 throw new IllegalStateException("Unexpected state " + state);
122             }
123         }
124
125         @Override
126         public synchronized Aggregated build() {
127             final Aggregated ret = new Aggregated(ImmutableList.copyOf(changes), subtrees,
128                 ImmutableList.copyOf(causes));
129             changes.clear();
130             causes.clear();
131             return ret;
132         }
133     }
134
135     private static final class Operational extends AbstractStateAggregator.Operational<State> {
136         private final DOMDataTreeListener listener;
137         private boolean failed;
138
139         Operational(final Collection<AbstractStateAggregator.StateBuilder<State>> builders,
140                 final DOMDataTreeListener listener) {
141             super(builders);
142             this.listener = requireNonNull(listener);
143         }
144
145         @Override
146         protected void notifyListener(final Iterator<State> iterator) {
147             if (failed) {
148                 iterator.forEachRemaining(state -> LOG.debug("Listener {} failed, ignoring state {}", listener, state));
149                 return;
150             }
151
152             final Stopwatch clock = Stopwatch.createStarted();
153             final List<DataTreeCandidate> changes = new ArrayList<>();
154             final List<DOMDataTreeListeningException> failures = new ArrayList<>(0);
155             final Builder<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = ImmutableMap.builder();
156             while (iterator.hasNext()) {
157                 collectState(iterator.next(), changes, subtrees, failures);
158             }
159
160             if (!changes.isEmpty()) {
161                 // Note: it is okay to leak changes, we must never leak mutable subtrees.
162                 callListener(listener, changes, subtrees.build());
163             }
164             if (!failures.isEmpty()) {
165                 failed = true;
166                 listener.onDataTreeFailed(failures);
167             }
168
169             LOG.trace("Listener {} notification completed in {}", listener, clock);
170         }
171     }
172
173     private static final Logger LOG = LoggerFactory.getLogger(DOMDataTreeListenerAggregator.class);
174
175     // Because a component listener may report a failure before we finish registering all listeners, we need a way
176     // to trigger a failure report from the thread *not* performing the registration.
177     private static final Executor FAILURE_NOTIFIER;
178
179     static {
180         final ThreadFactoryBuilder tfb = new ThreadFactoryBuilder().setDaemon(true)
181                 .setNameFormat(DOMDataTreeListenerAggregator.class.getSimpleName() + "-failure-%s");
182         FAILURE_NOTIFIER = Executors.newSingleThreadExecutor(tfb.build());
183     }
184
185     private final boolean allowRxMerges;
186
187     public DOMDataTreeListenerAggregator(final int sizeHint, final boolean allowRxMerges) {
188         super(sizeHint);
189         this.allowRxMerges = allowRxMerges;
190     }
191
192     public static <L extends DOMDataTreeListener, T> ListenerRegistration<L> aggregateIfNeeded(final L listener,
193             final Map<T, Collection<DOMDataTreeIdentifier>> subtrees, final boolean allowRxMerges,
194             final Function<T, DOMDataTreeShard> keyToShard) {
195         if (subtrees.size() == 1) {
196             final Entry<T, Collection<DOMDataTreeIdentifier>> entry = subtrees.entrySet().iterator()
197                     .next();
198             return CompatListenableDOMDataTreeShard.createIfNeeded(keyToShard.apply(entry.getKey()))
199                     .registerListener(listener, entry.getValue(), allowRxMerges);
200         }
201
202         // Alright, this the real deal, we have to aggregate.
203         final int size = subtrees.size();
204         final DOMDataTreeListenerAggregator aggregator = new DOMDataTreeListenerAggregator(size, allowRxMerges);
205         final Collection<ListenerRegistration<DOMDataTreeListener>> regs = new ArrayList<>(size);
206         for (Entry<T, Collection<DOMDataTreeIdentifier>> entry : subtrees.entrySet()) {
207             regs.add(CompatListenableDOMDataTreeShard.createIfNeeded(keyToShard.apply(entry.getKey()))
208                 .registerListener(aggregator.createListener(), entry.getValue(), allowRxMerges));
209         }
210
211         return aggregator.start(listener, regs);
212     }
213
214     public DOMDataTreeListener createListener() {
215         // TODO: do not ignore allowRxMerges, but rather create a dedicated subclass or something
216         final StateBuilder builder = new StateBuilder();
217         addBuilder(builder);
218
219         return new DOMDataTreeListener() {
220             @Override
221             public void onDataTreeFailed(final Collection<DOMDataTreeListeningException> causes) {
222                 receiveState(builder, new Failure(causes));
223             }
224
225             @Override
226             public void onDataTreeChanged(final Collection<DataTreeCandidate> changes,
227                     final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
228                 receiveState(builder, new Changes(changes, subtrees));
229             }
230         };
231     }
232
233     public <L extends DOMDataTreeListener> ListenerRegistration<L> start(final L listener,
234             final Collection<ListenerRegistration<DOMDataTreeListener>> regs) {
235
236         final Started<State> result = start(builders -> start(listener, regs, builders));
237         if (result instanceof Failed) {
238             return new AbstractListenerRegistration<>(listener) {
239                 @Override
240                 protected void removeRegistration() {
241                     // Listeners have already been closed, this is a no-op
242                 }
243             };
244         }
245
246         return new AbstractListenerRegistration<>(listener) {
247             @Override
248             protected void removeRegistration() {
249                 regs.forEach(ListenerRegistration::close);
250             }
251         };
252     }
253
254     static Started<State> start(final DOMDataTreeListener listener,
255             final Collection<ListenerRegistration<DOMDataTreeListener>> regs,
256             final Collection<AbstractStateAggregator.StateBuilder<State>> builders) {
257
258         final List<DataTreeCandidate> changes = new ArrayList<>();
259         final List<DOMDataTreeListeningException> failures = new ArrayList<>(0);
260         final Builder<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = ImmutableMap.builder();
261         for (AbstractStateAggregator.StateBuilder<State> builder : builders) {
262             collectState(builder.build(), changes, subtrees, failures);
263         }
264
265         if (!failures.isEmpty()) {
266             regs.forEach(ListenerRegistration::close);
267             FAILURE_NOTIFIER.execute(() -> listener.onDataTreeFailed(failures));
268             return new Failed<>(builders);
269         }
270         if (!changes.isEmpty()) {
271             callListener(listener, changes, subtrees.build());
272         }
273
274         return new Operational(builders, listener);
275     }
276
277     @SuppressWarnings("checkstyle:IllegalCatch")
278     static void callListener(final DOMDataTreeListener listener, final Collection<DataTreeCandidate> changes,
279             final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
280         try {
281             listener.onDataTreeChanged(changes, subtrees);
282         } catch (Exception e) {
283             LOG.error("Listener {} failed to process initial changes", listener, e);
284         }
285     }
286
287     static void collectState(final State state, final Collection<DataTreeCandidate> changes,
288             final Builder<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees,
289             final Collection<DOMDataTreeListeningException> failures) {
290         Verify.verify(state instanceof Aggregated, "Unexpected state %s", state);
291         final Aggregated aggregated = (Aggregated) state;
292
293         subtrees.putAll(aggregated.subtrees);
294         changes.addAll(aggregated.changes);
295         failures.addAll(aggregated.failures);
296     }
297 }