Merge branch 'mdsal-trace' from controller
[mdsal.git] / dom / mdsal-dom-spi / src / main / java / org / opendaylight / mdsal / dom / spi / shard / DOMDataTreeListenerAggregator.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies, s.ro. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.spi.shard;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.Beta;
13 import com.google.common.base.Stopwatch;
14 import com.google.common.base.Verify;
15 import com.google.common.collect.ImmutableList;
16 import com.google.common.collect.ImmutableMap;
17 import com.google.common.collect.ImmutableMap.Builder;
18 import com.google.common.util.concurrent.ThreadFactoryBuilder;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Map.Entry;
25 import java.util.concurrent.Executor;
26 import java.util.concurrent.Executors;
27 import java.util.function.Function;
28 import javax.annotation.concurrent.GuardedBy;
29 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
33 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
34 import org.opendaylight.yangtools.concepts.ListenerRegistration;
35 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
36 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 /**
41  * Aggregator which combines multiple disjunct {@link DOMDataTreeListener} and forwards their changes to a central
42  * listener.
43  *
44  * @author Robert Varga
45  */
46 @Beta
47 public final class DOMDataTreeListenerAggregator
48         extends AbstractStateAggregator<DOMDataTreeListenerAggregator.State> {
49
50     abstract static class State extends AbstractStateAggregator.State {
51
52     }
53
54     private static final class Aggregated extends State {
55         final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees;
56         final Collection<DOMDataTreeListeningException> failures;
57         final Collection<DataTreeCandidate> changes;
58
59         Aggregated(final Collection<DataTreeCandidate> changes,
60             final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees,
61             final Collection<DOMDataTreeListeningException> failures) {
62             this.changes = requireNonNull(changes);
63             this.subtrees = requireNonNull(subtrees);
64             this.failures = requireNonNull(failures);
65         }
66     }
67
68     private static final class Changes extends State {
69         final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees;
70         final Collection<DataTreeCandidate> changes;
71
72         Changes(final Collection<DataTreeCandidate> changes,
73             final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
74             this.changes = requireNonNull(changes);
75             this.subtrees = requireNonNull(subtrees);
76         }
77     }
78
79     private static final class Failure extends State {
80         final Collection<DOMDataTreeListeningException> causes;
81
82         Failure(final Collection<DOMDataTreeListeningException> causes) {
83             this.causes = requireNonNull(causes);
84         }
85     }
86
87     private static final class StateBuilder extends AbstractStateAggregator.StateBuilder<State> {
88         @GuardedBy("this")
89         private final Collection<DOMDataTreeListeningException> causes = new ArrayList<>(0);
90         @GuardedBy("this")
91         private final Collection<DataTreeCandidate> changes = new ArrayList<>();
92         @GuardedBy("this")
93         private Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = ImmutableMap.of();
94
95         @Override
96         protected void append(final State state) {
97             if (state instanceof Changes) {
98                 final Changes changesState = (Changes) state;
99                 this.changes.addAll(changesState.changes);
100                 subtrees = ImmutableMap.copyOf(changesState.subtrees);
101             } else if (state instanceof Failure) {
102                 causes.addAll(((Failure) state).causes);
103             } else {
104                 throw new IllegalStateException("Unexpected state " + state);
105             }
106         }
107
108         @Override
109         protected synchronized void appendInitial(final State state) {
110             // TODO: we could index and compress state changes here
111             if (state instanceof Changes) {
112                 final Changes changesState = (Changes) state;
113                 this.changes.addAll(changesState.changes);
114                 subtrees = ImmutableMap.copyOf(changesState.subtrees);
115             } else if (state instanceof Failure) {
116                 causes.addAll(((Failure) state).causes);
117             } else {
118                 throw new IllegalStateException("Unexpected state " + state);
119             }
120         }
121
122         @Override
123         public synchronized Aggregated build() {
124             final Aggregated ret = new Aggregated(ImmutableList.copyOf(changes), subtrees,
125                 ImmutableList.copyOf(causes));
126             changes.clear();
127             causes.clear();
128             return ret;
129         }
130     }
131
132     private static final class Operational extends AbstractStateAggregator.Operational<State> {
133         private final DOMDataTreeListener listener;
134         private boolean failed;
135
136         Operational(final Collection<AbstractStateAggregator.StateBuilder<State>> builders,
137                 final DOMDataTreeListener listener) {
138             super(builders);
139             this.listener = requireNonNull(listener);
140         }
141
142         @Override
143         protected void notifyListener(final Iterator<State> iterator) {
144             if (failed) {
145                 iterator.forEachRemaining(state -> LOG.debug("Listener {} failed, ignoring state {}", state));
146                 return;
147             }
148
149             final Stopwatch clock = Stopwatch.createStarted();
150             final List<DataTreeCandidate> changes = new ArrayList<>();
151             final List<DOMDataTreeListeningException> failures = new ArrayList<>(0);
152             final Builder<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = ImmutableMap.builder();
153             while (iterator.hasNext()) {
154                 collectState(iterator.next(), changes, subtrees, failures);
155             }
156
157             if (!changes.isEmpty()) {
158                 // Note: it is okay to leak changes, we must never leak mutable subtrees.
159                 callListener(listener, changes, subtrees.build());
160             }
161             if (!failures.isEmpty()) {
162                 failed = true;
163                 listener.onDataTreeFailed(failures);
164             }
165
166             LOG.trace("Listener {} notification completed in {}", listener, clock);
167         }
168     }
169
170     private static final Logger LOG = LoggerFactory.getLogger(DOMDataTreeListenerAggregator.class);
171
172     // Because a component listener may report a failure before we finish registering all listeners, we need a way
173     // to trigger a failure report from the thread *not* performing the registration.
174     private static final Executor FAILURE_NOTIFIER;
175
176     static {
177         final ThreadFactoryBuilder tfb = new ThreadFactoryBuilder().setDaemon(true)
178                 .setNameFormat(DOMDataTreeListenerAggregator.class.getSimpleName() + "-failure-%s");
179         FAILURE_NOTIFIER = Executors.newSingleThreadExecutor(tfb.build());
180     }
181
182     private final boolean allowRxMerges;
183
184     public DOMDataTreeListenerAggregator(final int sizeHint, final boolean allowRxMerges) {
185         super(sizeHint);
186         this.allowRxMerges = allowRxMerges;
187     }
188
189     public static <L extends DOMDataTreeListener, T> ListenerRegistration<L> aggregateIfNeeded(final L listener,
190             final Map<T, Collection<DOMDataTreeIdentifier>> subtrees, final boolean allowRxMerges,
191             final Function<T, DOMDataTreeShard> keyToShard) {
192         if (subtrees.size() == 1) {
193             final Entry<T, Collection<DOMDataTreeIdentifier>> entry = subtrees.entrySet().iterator()
194                     .next();
195             return CompatListenableDOMDataTreeShard.createIfNeeded(keyToShard.apply(entry.getKey()))
196                     .registerListener(listener, entry.getValue(), allowRxMerges);
197         }
198
199         // Alright, this the real deal, we have to aggregate.
200         final int size = subtrees.size();
201         final DOMDataTreeListenerAggregator aggregator = new DOMDataTreeListenerAggregator(size, allowRxMerges);
202         final Collection<ListenerRegistration<DOMDataTreeListener>> regs = new ArrayList<>(size);
203         for (Entry<T, Collection<DOMDataTreeIdentifier>> entry : subtrees.entrySet()) {
204             regs.add(CompatListenableDOMDataTreeShard.createIfNeeded(keyToShard.apply(entry.getKey()))
205                 .registerListener(aggregator.createListener(), entry.getValue(), allowRxMerges));
206         }
207
208         return aggregator.start(listener, regs);
209     }
210
211     public DOMDataTreeListener createListener() {
212         // TODO: do not ignore allowRxMerges, but rather create a dedicated subclass or something
213         final StateBuilder builder = new StateBuilder();
214         addBuilder(builder);
215
216         return new DOMDataTreeListener() {
217             @Override
218             public void onDataTreeFailed(final Collection<DOMDataTreeListeningException> causes) {
219                 receiveState(builder, new Failure(causes));
220             }
221
222             @Override
223             public void onDataTreeChanged(final Collection<DataTreeCandidate> changes,
224                     final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
225                 receiveState(builder, new Changes(changes, subtrees));
226             }
227         };
228     }
229
230     public <L extends DOMDataTreeListener> ListenerRegistration<L> start(final L listener,
231             final Collection<ListenerRegistration<DOMDataTreeListener>> regs) {
232
233         final Started<State> result = start(builders -> start(listener, regs, builders));
234         if (result instanceof Failed) {
235             return new AbstractListenerRegistration<L>(listener) {
236                 @Override
237                 protected void removeRegistration() {
238                     // Listeners have already been closed, this is a no-op
239                 }
240             };
241         }
242
243         return new AbstractListenerRegistration<L>(listener) {
244             @Override
245             protected void removeRegistration() {
246                 regs.forEach(ListenerRegistration::close);
247             }
248         };
249     }
250
251     static Started<State> start(final DOMDataTreeListener listener,
252             final Collection<ListenerRegistration<DOMDataTreeListener>> regs,
253             final Collection<AbstractStateAggregator.StateBuilder<State>> builders) {
254
255         final List<DataTreeCandidate> changes = new ArrayList<>();
256         final List<DOMDataTreeListeningException> failures = new ArrayList<>(0);
257         final Builder<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = ImmutableMap.builder();
258         for (AbstractStateAggregator.StateBuilder<State> builder : builders) {
259             collectState(builder.build(), changes, subtrees, failures);
260         }
261
262         if (!failures.isEmpty()) {
263             regs.forEach(ListenerRegistration::close);
264             FAILURE_NOTIFIER.execute(() -> listener.onDataTreeFailed(failures));
265             return new Failed<>(builders);
266         }
267         if (!changes.isEmpty()) {
268             callListener(listener, changes, subtrees.build());
269         }
270
271         return new Operational(builders, listener);
272     }
273
274     @SuppressWarnings("checkstyle:IllegalCatch")
275     static void callListener(final DOMDataTreeListener listener, final Collection<DataTreeCandidate> changes,
276             final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
277         try {
278             listener.onDataTreeChanged(changes, subtrees);
279         } catch (Exception e) {
280             LOG.error("Listener {} failed to process initial changes", listener, e);
281         }
282     }
283
284     static void collectState(final State state, final Collection<DataTreeCandidate> changes,
285             final Builder<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees,
286             final Collection<DOMDataTreeListeningException> failures) {
287         Verify.verify(state instanceof Aggregated, "Unexpected state %s", state);
288         final Aggregated aggregated = (Aggregated) state;
289
290         subtrees.putAll(aggregated.subtrees);
291         changes.addAll(aggregated.changes);
292         failures.addAll(aggregated.failures);
293     }
294 }