Fix odlparent-3.0.0 checkstyle issues
[mdsal.git] / dom / mdsal-dom-spi / src / main / java / org / opendaylight / mdsal / dom / spi / shard / DOMDataTreeListenerAggregator.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies, s.ro. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.spi.shard;
9
10 import com.google.common.annotations.Beta;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Stopwatch;
13 import com.google.common.base.Verify;
14 import com.google.common.collect.ImmutableList;
15 import com.google.common.collect.ImmutableMap;
16 import com.google.common.collect.ImmutableMap.Builder;
17 import com.google.common.util.concurrent.ThreadFactoryBuilder;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.Iterator;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Map.Entry;
24 import java.util.concurrent.Executor;
25 import java.util.concurrent.Executors;
26 import java.util.function.Function;
27 import javax.annotation.concurrent.GuardedBy;
28 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
29 import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
32 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
33 import org.opendaylight.yangtools.concepts.ListenerRegistration;
34 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * Aggregator which combines multiple disjunct {@link DOMDataTreeListener} and forwards their changes to a central
41  * listener.
42  *
43  * @author Robert Varga
44  */
45 @Beta
46 public final class DOMDataTreeListenerAggregator
47         extends AbstractStateAggregator<DOMDataTreeListenerAggregator.State> {
48
49     abstract static class State extends AbstractStateAggregator.State {
50
51     }
52
53     private static final class Aggregated extends State {
54         final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees;
55         final Collection<DOMDataTreeListeningException> failures;
56         final Collection<DataTreeCandidate> changes;
57
58         Aggregated(final Collection<DataTreeCandidate> changes,
59             final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees,
60             final Collection<DOMDataTreeListeningException> failures) {
61             this.changes = Preconditions.checkNotNull(changes);
62             this.subtrees = Preconditions.checkNotNull(subtrees);
63             this.failures = Preconditions.checkNotNull(failures);
64         }
65     }
66
67     private static final class Changes extends State {
68         final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees;
69         final Collection<DataTreeCandidate> changes;
70
71         Changes(final Collection<DataTreeCandidate> changes,
72             final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
73             this.changes = Preconditions.checkNotNull(changes);
74             this.subtrees = Preconditions.checkNotNull(subtrees);
75         }
76     }
77
78     private static final class Failure extends State {
79         final Collection<DOMDataTreeListeningException> causes;
80
81         Failure(final Collection<DOMDataTreeListeningException> causes) {
82             this.causes = Preconditions.checkNotNull(causes);
83         }
84     }
85
86     private static final class StateBuilder extends AbstractStateAggregator.StateBuilder<State> {
87         @GuardedBy("this")
88         private final Collection<DOMDataTreeListeningException> causes = new ArrayList<>(0);
89         @GuardedBy("this")
90         private final Collection<DataTreeCandidate> changes = new ArrayList<>();
91         @GuardedBy("this")
92         private Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = ImmutableMap.of();
93
94         @Override
95         protected void append(final State state) {
96             if (state instanceof Changes) {
97                 final Changes changesState = (Changes) state;
98                 this.changes.addAll(changesState.changes);
99                 subtrees = ImmutableMap.copyOf(changesState.subtrees);
100             } else if (state instanceof Failure) {
101                 causes.addAll(((Failure) state).causes);
102             } else {
103                 throw new IllegalStateException("Unexpected state " + state);
104             }
105         }
106
107         @Override
108         protected synchronized void appendInitial(final State state) {
109             // TODO: we could index and compress state changes here
110             if (state instanceof Changes) {
111                 final Changes changesState = (Changes) state;
112                 this.changes.addAll(changesState.changes);
113                 subtrees = ImmutableMap.copyOf(changesState.subtrees);
114             } else if (state instanceof Failure) {
115                 causes.addAll(((Failure) state).causes);
116             } else {
117                 throw new IllegalStateException("Unexpected state " + state);
118             }
119         }
120
121         @Override
122         public synchronized Aggregated build() {
123             final Aggregated ret = new Aggregated(ImmutableList.copyOf(changes), subtrees,
124                 ImmutableList.copyOf(causes));
125             changes.clear();
126             causes.clear();
127             return ret;
128         }
129     }
130
131     private static final class Operational extends AbstractStateAggregator.Operational<State> {
132         private final DOMDataTreeListener listener;
133         private boolean failed;
134
135         Operational(final Collection<AbstractStateAggregator.StateBuilder<State>> builders,
136                 final DOMDataTreeListener listener) {
137             super(builders);
138             this.listener = Preconditions.checkNotNull(listener);
139         }
140
141         @Override
142         protected void notifyListener(final Iterator<State> iterator) {
143             if (failed) {
144                 iterator.forEachRemaining(state -> LOG.debug("Listener {} failed, ignoring state {}", state));
145                 return;
146             }
147
148             final Stopwatch clock = Stopwatch.createStarted();
149             final List<DataTreeCandidate> changes = new ArrayList<>();
150             final List<DOMDataTreeListeningException> failures = new ArrayList<>(0);
151             final Builder<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = ImmutableMap.builder();
152             while (iterator.hasNext()) {
153                 collectState(iterator.next(), changes, subtrees, failures);
154             }
155
156             if (!changes.isEmpty()) {
157                 // Note: it is okay to leak changes, we must never leak mutable subtrees.
158                 callListener(listener, changes, subtrees.build());
159             }
160             if (!failures.isEmpty()) {
161                 failed = true;
162                 listener.onDataTreeFailed(failures);
163             }
164
165             LOG.trace("Listener {} notification completed in {}", listener, clock);
166         }
167     }
168
169     private static final Logger LOG = LoggerFactory.getLogger(DOMDataTreeListenerAggregator.class);
170
171     // Because a component listener may report a failure before we finish registering all listeners, we need a way
172     // to trigger a failure report from the thread *not* performing the registration.
173     private static final Executor FAILURE_NOTIFIER;
174
175     static {
176         final ThreadFactoryBuilder tfb = new ThreadFactoryBuilder().setDaemon(true)
177                 .setNameFormat(DOMDataTreeListenerAggregator.class.getSimpleName() + "-failure-%s");
178         FAILURE_NOTIFIER = Executors.newSingleThreadExecutor(tfb.build());
179     }
180
181     private final boolean allowRxMerges;
182
183     public DOMDataTreeListenerAggregator(final int sizeHint, final boolean allowRxMerges) {
184         super(sizeHint);
185         this.allowRxMerges = allowRxMerges;
186     }
187
188     public static <L extends DOMDataTreeListener, T> ListenerRegistration<L> aggregateIfNeeded(final L listener,
189             final Map<T, Collection<DOMDataTreeIdentifier>> subtrees, final boolean allowRxMerges,
190             final Function<T, DOMDataTreeShard> keyToShard) {
191         if (subtrees.size() == 1) {
192             final Entry<T, Collection<DOMDataTreeIdentifier>> entry = subtrees.entrySet().iterator()
193                     .next();
194             return CompatListenableDOMDataTreeShard.createIfNeeded(keyToShard.apply(entry.getKey()))
195                     .registerListener(listener, entry.getValue(), allowRxMerges);
196         }
197
198         // Alright, this the real deal, we have to aggregate.
199         final int size = subtrees.size();
200         final DOMDataTreeListenerAggregator aggregator = new DOMDataTreeListenerAggregator(size, allowRxMerges);
201         final Collection<ListenerRegistration<DOMDataTreeListener>> regs = new ArrayList<>(size);
202         for (Entry<T, Collection<DOMDataTreeIdentifier>> entry : subtrees.entrySet()) {
203             regs.add(CompatListenableDOMDataTreeShard.createIfNeeded(keyToShard.apply(entry.getKey()))
204                 .registerListener(aggregator.createListener(), entry.getValue(), allowRxMerges));
205         }
206
207         return aggregator.start(listener, regs);
208     }
209
210     public DOMDataTreeListener createListener() {
211         // TODO: do not ignore allowRxMerges, but rather create a dedicated subclass or something
212         final StateBuilder builder = new StateBuilder();
213         addBuilder(builder);
214
215         return new DOMDataTreeListener() {
216             @Override
217             public void onDataTreeFailed(final Collection<DOMDataTreeListeningException> causes) {
218                 receiveState(builder, new Failure(causes));
219             }
220
221             @Override
222             public void onDataTreeChanged(final Collection<DataTreeCandidate> changes,
223                     final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
224                 receiveState(builder, new Changes(changes, subtrees));
225             }
226         };
227     }
228
229     public <L extends DOMDataTreeListener> ListenerRegistration<L> start(final L listener,
230             final Collection<ListenerRegistration<DOMDataTreeListener>> regs) {
231
232         final Started<State> result = start(builders -> start(listener, regs, builders));
233         if (result instanceof Failed) {
234             return new AbstractListenerRegistration<L>(listener) {
235                 @Override
236                 protected void removeRegistration() {
237                     // Listeners have already been closed, this is a no-op
238                 }
239             };
240         }
241
242         return new AbstractListenerRegistration<L>(listener) {
243             @Override
244             protected void removeRegistration() {
245                 regs.forEach(ListenerRegistration::close);
246             }
247         };
248     }
249
250     static Started<State> start(final DOMDataTreeListener listener,
251             final Collection<ListenerRegistration<DOMDataTreeListener>> regs,
252             final Collection<AbstractStateAggregator.StateBuilder<State>> builders) {
253
254         final List<DataTreeCandidate> changes = new ArrayList<>();
255         final List<DOMDataTreeListeningException> failures = new ArrayList<>(0);
256         final Builder<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = ImmutableMap.builder();
257         for (AbstractStateAggregator.StateBuilder<State> builder : builders) {
258             collectState(builder.build(), changes, subtrees, failures);
259         }
260
261         if (!failures.isEmpty()) {
262             regs.forEach(ListenerRegistration::close);
263             FAILURE_NOTIFIER.execute(() -> listener.onDataTreeFailed(failures));
264             return new Failed<>(builders);
265         }
266         if (!changes.isEmpty()) {
267             callListener(listener, changes, subtrees.build());
268         }
269
270         return new Operational(builders, listener);
271     }
272
273     @SuppressWarnings("checkstyle:IllegalCatch")
274     static void callListener(final DOMDataTreeListener listener, final Collection<DataTreeCandidate> changes,
275             final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
276         try {
277             listener.onDataTreeChanged(changes, subtrees);
278         } catch (Exception e) {
279             LOG.error("Listener {} failed to process initial changes", listener, e);
280         }
281     }
282
283     static void collectState(final State state, final Collection<DataTreeCandidate> changes,
284             final Builder<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees,
285             final Collection<DOMDataTreeListeningException> failures) {
286         Verify.verify(state instanceof Aggregated, "Unexpected state %s", state);
287         final Aggregated aggregated = (Aggregated) state;
288
289         subtrees.putAll(aggregated.subtrees);
290         changes.addAll(aggregated.changes);
291         failures.addAll(aggregated.failures);
292     }
293 }