2 * Copyright (c) 2017 Pantheon Technologies, s.ro. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.dom.spi.shard;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.Beta;
13 import com.google.common.base.Stopwatch;
14 import com.google.common.base.Verify;
15 import com.google.common.collect.ImmutableList;
16 import com.google.common.collect.ImmutableMap;
17 import com.google.common.collect.ImmutableMap.Builder;
18 import com.google.common.util.concurrent.ThreadFactoryBuilder;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.Iterator;
22 import java.util.List;
24 import java.util.Map.Entry;
25 import java.util.concurrent.Executor;
26 import java.util.concurrent.Executors;
27 import java.util.function.Function;
28 import org.checkerframework.checker.lock.qual.GuardedBy;
29 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
33 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
34 import org.opendaylight.yangtools.concepts.ListenerRegistration;
35 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
36 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
41 * Aggregator which combines multiple disjunct {@link DOMDataTreeListener} and forwards their changes to a central
44 * @author Robert Varga
46 * @deprecated This interface is scheduled for removal in the next major release.
48 @Deprecated(forRemoval = true)
50 public final class DOMDataTreeListenerAggregator
51 extends AbstractStateAggregator<DOMDataTreeListenerAggregator.State> {
53 abstract static class State extends AbstractStateAggregator.State {
57 private static final class Aggregated extends State {
58 final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees;
59 final Collection<DOMDataTreeListeningException> failures;
60 final Collection<DataTreeCandidate> changes;
62 Aggregated(final Collection<DataTreeCandidate> changes,
63 final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees,
64 final Collection<DOMDataTreeListeningException> failures) {
65 this.changes = requireNonNull(changes);
66 this.subtrees = requireNonNull(subtrees);
67 this.failures = requireNonNull(failures);
71 private static final class Changes extends State {
72 final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees;
73 final Collection<DataTreeCandidate> changes;
75 Changes(final Collection<DataTreeCandidate> changes,
76 final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
77 this.changes = requireNonNull(changes);
78 this.subtrees = requireNonNull(subtrees);
82 private static final class Failure extends State {
83 final Collection<DOMDataTreeListeningException> causes;
85 Failure(final Collection<DOMDataTreeListeningException> causes) {
86 this.causes = requireNonNull(causes);
90 private static final class StateBuilder extends AbstractStateAggregator.StateBuilder<State> {
92 private final Collection<DOMDataTreeListeningException> causes = new ArrayList<>(0);
94 private final Collection<DataTreeCandidate> changes = new ArrayList<>();
96 private Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = ImmutableMap.of();
99 protected synchronized void append(final State state) {
100 if (state instanceof Changes) {
101 final Changes changesState = (Changes) state;
102 this.changes.addAll(changesState.changes);
103 subtrees = ImmutableMap.copyOf(changesState.subtrees);
104 } else if (state instanceof Failure) {
105 causes.addAll(((Failure) state).causes);
107 throw new IllegalStateException("Unexpected state " + state);
112 protected synchronized void appendInitial(final State state) {
113 // TODO: we could index and compress state changes here
114 if (state instanceof Changes) {
115 final Changes changesState = (Changes) state;
116 this.changes.addAll(changesState.changes);
117 subtrees = ImmutableMap.copyOf(changesState.subtrees);
118 } else if (state instanceof Failure) {
119 causes.addAll(((Failure) state).causes);
121 throw new IllegalStateException("Unexpected state " + state);
126 public synchronized Aggregated build() {
127 final Aggregated ret = new Aggregated(ImmutableList.copyOf(changes), subtrees,
128 ImmutableList.copyOf(causes));
135 private static final class Operational extends AbstractStateAggregator.Operational<State> {
136 private final DOMDataTreeListener listener;
137 private boolean failed;
139 Operational(final Collection<AbstractStateAggregator.StateBuilder<State>> builders,
140 final DOMDataTreeListener listener) {
142 this.listener = requireNonNull(listener);
146 protected void notifyListener(final Iterator<State> iterator) {
148 iterator.forEachRemaining(state -> LOG.debug("Listener {} failed, ignoring state {}", listener, state));
152 final Stopwatch clock = Stopwatch.createStarted();
153 final List<DataTreeCandidate> changes = new ArrayList<>();
154 final List<DOMDataTreeListeningException> failures = new ArrayList<>(0);
155 final Builder<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = ImmutableMap.builder();
156 while (iterator.hasNext()) {
157 collectState(iterator.next(), changes, subtrees, failures);
160 if (!changes.isEmpty()) {
161 // Note: it is okay to leak changes, we must never leak mutable subtrees.
162 callListener(listener, changes, subtrees.build());
164 if (!failures.isEmpty()) {
166 listener.onDataTreeFailed(failures);
169 LOG.trace("Listener {} notification completed in {}", listener, clock);
173 private static final Logger LOG = LoggerFactory.getLogger(DOMDataTreeListenerAggregator.class);
175 // Because a component listener may report a failure before we finish registering all listeners, we need a way
176 // to trigger a failure report from the thread *not* performing the registration.
177 private static final Executor FAILURE_NOTIFIER;
180 final ThreadFactoryBuilder tfb = new ThreadFactoryBuilder().setDaemon(true)
181 .setNameFormat(DOMDataTreeListenerAggregator.class.getSimpleName() + "-failure-%s");
182 FAILURE_NOTIFIER = Executors.newSingleThreadExecutor(tfb.build());
185 private final boolean allowRxMerges;
187 public DOMDataTreeListenerAggregator(final int sizeHint, final boolean allowRxMerges) {
189 this.allowRxMerges = allowRxMerges;
192 public static <L extends DOMDataTreeListener, T> ListenerRegistration<L> aggregateIfNeeded(final L listener,
193 final Map<T, Collection<DOMDataTreeIdentifier>> subtrees, final boolean allowRxMerges,
194 final Function<T, DOMDataTreeShard> keyToShard) {
195 if (subtrees.size() == 1) {
196 final Entry<T, Collection<DOMDataTreeIdentifier>> entry = subtrees.entrySet().iterator()
198 return CompatListenableDOMDataTreeShard.createIfNeeded(keyToShard.apply(entry.getKey()))
199 .registerListener(listener, entry.getValue(), allowRxMerges);
202 // Alright, this the real deal, we have to aggregate.
203 final int size = subtrees.size();
204 final DOMDataTreeListenerAggregator aggregator = new DOMDataTreeListenerAggregator(size, allowRxMerges);
205 final Collection<ListenerRegistration<DOMDataTreeListener>> regs = new ArrayList<>(size);
206 for (Entry<T, Collection<DOMDataTreeIdentifier>> entry : subtrees.entrySet()) {
207 regs.add(CompatListenableDOMDataTreeShard.createIfNeeded(keyToShard.apply(entry.getKey()))
208 .registerListener(aggregator.createListener(), entry.getValue(), allowRxMerges));
211 return aggregator.start(listener, regs);
214 public DOMDataTreeListener createListener() {
215 // TODO: do not ignore allowRxMerges, but rather create a dedicated subclass or something
216 final StateBuilder builder = new StateBuilder();
219 return new DOMDataTreeListener() {
221 public void onDataTreeFailed(final Collection<DOMDataTreeListeningException> causes) {
222 receiveState(builder, new Failure(causes));
226 public void onDataTreeChanged(final Collection<DataTreeCandidate> changes,
227 final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
228 receiveState(builder, new Changes(changes, subtrees));
233 public <L extends DOMDataTreeListener> ListenerRegistration<L> start(final L listener,
234 final Collection<ListenerRegistration<DOMDataTreeListener>> regs) {
236 final Started<State> result = start(builders -> start(listener, regs, builders));
237 if (result instanceof Failed) {
238 return new AbstractListenerRegistration<>(listener) {
240 protected void removeRegistration() {
241 // Listeners have already been closed, this is a no-op
246 return new AbstractListenerRegistration<>(listener) {
248 protected void removeRegistration() {
249 regs.forEach(ListenerRegistration::close);
254 static Started<State> start(final DOMDataTreeListener listener,
255 final Collection<ListenerRegistration<DOMDataTreeListener>> regs,
256 final Collection<AbstractStateAggregator.StateBuilder<State>> builders) {
258 final List<DataTreeCandidate> changes = new ArrayList<>();
259 final List<DOMDataTreeListeningException> failures = new ArrayList<>(0);
260 final Builder<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = ImmutableMap.builder();
261 for (AbstractStateAggregator.StateBuilder<State> builder : builders) {
262 collectState(builder.build(), changes, subtrees, failures);
265 if (!failures.isEmpty()) {
266 regs.forEach(ListenerRegistration::close);
267 FAILURE_NOTIFIER.execute(() -> listener.onDataTreeFailed(failures));
268 return new Failed<>(builders);
270 if (!changes.isEmpty()) {
271 callListener(listener, changes, subtrees.build());
274 return new Operational(builders, listener);
277 @SuppressWarnings("checkstyle:IllegalCatch")
278 static void callListener(final DOMDataTreeListener listener, final Collection<DataTreeCandidate> changes,
279 final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
281 listener.onDataTreeChanged(changes, subtrees);
282 } catch (Exception e) {
283 LOG.error("Listener {} failed to process initial changes", listener, e);
287 static void collectState(final State state, final Collection<DataTreeCandidate> changes,
288 final Builder<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees,
289 final Collection<DOMDataTreeListeningException> failures) {
290 Verify.verify(state instanceof Aggregated, "Unexpected state %s", state);
291 final Aggregated aggregated = (Aggregated) state;
293 subtrees.putAll(aggregated.subtrees);
294 changes.addAll(aggregated.changes);
295 failures.addAll(aggregated.failures);