2 * Copyright (c) 2017 Pantheon Technologies, s.ro. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.dom.spi.shard;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.Beta;
13 import com.google.common.base.Stopwatch;
14 import com.google.common.base.Verify;
15 import com.google.common.collect.ImmutableList;
16 import com.google.common.collect.ImmutableMap;
17 import com.google.common.collect.ImmutableMap.Builder;
18 import com.google.common.util.concurrent.ThreadFactoryBuilder;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.Iterator;
22 import java.util.List;
24 import java.util.Map.Entry;
25 import java.util.concurrent.Executor;
26 import java.util.concurrent.Executors;
27 import java.util.function.Function;
28 import javax.annotation.concurrent.GuardedBy;
29 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
33 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
34 import org.opendaylight.yangtools.concepts.ListenerRegistration;
35 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
36 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
41 * Aggregator which combines multiple disjunct {@link DOMDataTreeListener} and forwards their changes to a central
44 * @author Robert Varga
47 public final class DOMDataTreeListenerAggregator
48 extends AbstractStateAggregator<DOMDataTreeListenerAggregator.State> {
50 abstract static class State extends AbstractStateAggregator.State {
54 private static final class Aggregated extends State {
55 final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees;
56 final Collection<DOMDataTreeListeningException> failures;
57 final Collection<DataTreeCandidate> changes;
59 Aggregated(final Collection<DataTreeCandidate> changes,
60 final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees,
61 final Collection<DOMDataTreeListeningException> failures) {
62 this.changes = requireNonNull(changes);
63 this.subtrees = requireNonNull(subtrees);
64 this.failures = requireNonNull(failures);
68 private static final class Changes extends State {
69 final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees;
70 final Collection<DataTreeCandidate> changes;
72 Changes(final Collection<DataTreeCandidate> changes,
73 final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
74 this.changes = requireNonNull(changes);
75 this.subtrees = requireNonNull(subtrees);
79 private static final class Failure extends State {
80 final Collection<DOMDataTreeListeningException> causes;
82 Failure(final Collection<DOMDataTreeListeningException> causes) {
83 this.causes = requireNonNull(causes);
87 private static final class StateBuilder extends AbstractStateAggregator.StateBuilder<State> {
89 private final Collection<DOMDataTreeListeningException> causes = new ArrayList<>(0);
91 private final Collection<DataTreeCandidate> changes = new ArrayList<>();
93 private Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = ImmutableMap.of();
96 protected void append(final State state) {
97 if (state instanceof Changes) {
98 final Changes changesState = (Changes) state;
99 this.changes.addAll(changesState.changes);
100 subtrees = ImmutableMap.copyOf(changesState.subtrees);
101 } else if (state instanceof Failure) {
102 causes.addAll(((Failure) state).causes);
104 throw new IllegalStateException("Unexpected state " + state);
109 protected synchronized void appendInitial(final State state) {
110 // TODO: we could index and compress state changes here
111 if (state instanceof Changes) {
112 final Changes changesState = (Changes) state;
113 this.changes.addAll(changesState.changes);
114 subtrees = ImmutableMap.copyOf(changesState.subtrees);
115 } else if (state instanceof Failure) {
116 causes.addAll(((Failure) state).causes);
118 throw new IllegalStateException("Unexpected state " + state);
123 public synchronized Aggregated build() {
124 final Aggregated ret = new Aggregated(ImmutableList.copyOf(changes), subtrees,
125 ImmutableList.copyOf(causes));
132 private static final class Operational extends AbstractStateAggregator.Operational<State> {
133 private final DOMDataTreeListener listener;
134 private boolean failed;
136 Operational(final Collection<AbstractStateAggregator.StateBuilder<State>> builders,
137 final DOMDataTreeListener listener) {
139 this.listener = requireNonNull(listener);
143 protected void notifyListener(final Iterator<State> iterator) {
145 iterator.forEachRemaining(state -> LOG.debug("Listener {} failed, ignoring state {}", state));
149 final Stopwatch clock = Stopwatch.createStarted();
150 final List<DataTreeCandidate> changes = new ArrayList<>();
151 final List<DOMDataTreeListeningException> failures = new ArrayList<>(0);
152 final Builder<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = ImmutableMap.builder();
153 while (iterator.hasNext()) {
154 collectState(iterator.next(), changes, subtrees, failures);
157 if (!changes.isEmpty()) {
158 // Note: it is okay to leak changes, we must never leak mutable subtrees.
159 callListener(listener, changes, subtrees.build());
161 if (!failures.isEmpty()) {
163 listener.onDataTreeFailed(failures);
166 LOG.trace("Listener {} notification completed in {}", listener, clock);
170 private static final Logger LOG = LoggerFactory.getLogger(DOMDataTreeListenerAggregator.class);
172 // Because a component listener may report a failure before we finish registering all listeners, we need a way
173 // to trigger a failure report from the thread *not* performing the registration.
174 private static final Executor FAILURE_NOTIFIER;
177 final ThreadFactoryBuilder tfb = new ThreadFactoryBuilder().setDaemon(true)
178 .setNameFormat(DOMDataTreeListenerAggregator.class.getSimpleName() + "-failure-%s");
179 FAILURE_NOTIFIER = Executors.newSingleThreadExecutor(tfb.build());
182 private final boolean allowRxMerges;
184 public DOMDataTreeListenerAggregator(final int sizeHint, final boolean allowRxMerges) {
186 this.allowRxMerges = allowRxMerges;
189 public static <L extends DOMDataTreeListener, T> ListenerRegistration<L> aggregateIfNeeded(final L listener,
190 final Map<T, Collection<DOMDataTreeIdentifier>> subtrees, final boolean allowRxMerges,
191 final Function<T, DOMDataTreeShard> keyToShard) {
192 if (subtrees.size() == 1) {
193 final Entry<T, Collection<DOMDataTreeIdentifier>> entry = subtrees.entrySet().iterator()
195 return CompatListenableDOMDataTreeShard.createIfNeeded(keyToShard.apply(entry.getKey()))
196 .registerListener(listener, entry.getValue(), allowRxMerges);
199 // Alright, this the real deal, we have to aggregate.
200 final int size = subtrees.size();
201 final DOMDataTreeListenerAggregator aggregator = new DOMDataTreeListenerAggregator(size, allowRxMerges);
202 final Collection<ListenerRegistration<DOMDataTreeListener>> regs = new ArrayList<>(size);
203 for (Entry<T, Collection<DOMDataTreeIdentifier>> entry : subtrees.entrySet()) {
204 regs.add(CompatListenableDOMDataTreeShard.createIfNeeded(keyToShard.apply(entry.getKey()))
205 .registerListener(aggregator.createListener(), entry.getValue(), allowRxMerges));
208 return aggregator.start(listener, regs);
211 public DOMDataTreeListener createListener() {
212 // TODO: do not ignore allowRxMerges, but rather create a dedicated subclass or something
213 final StateBuilder builder = new StateBuilder();
216 return new DOMDataTreeListener() {
218 public void onDataTreeFailed(final Collection<DOMDataTreeListeningException> causes) {
219 receiveState(builder, new Failure(causes));
223 public void onDataTreeChanged(final Collection<DataTreeCandidate> changes,
224 final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
225 receiveState(builder, new Changes(changes, subtrees));
230 public <L extends DOMDataTreeListener> ListenerRegistration<L> start(final L listener,
231 final Collection<ListenerRegistration<DOMDataTreeListener>> regs) {
233 final Started<State> result = start(builders -> start(listener, regs, builders));
234 if (result instanceof Failed) {
235 return new AbstractListenerRegistration<L>(listener) {
237 protected void removeRegistration() {
238 // Listeners have already been closed, this is a no-op
243 return new AbstractListenerRegistration<L>(listener) {
245 protected void removeRegistration() {
246 regs.forEach(ListenerRegistration::close);
251 static Started<State> start(final DOMDataTreeListener listener,
252 final Collection<ListenerRegistration<DOMDataTreeListener>> regs,
253 final Collection<AbstractStateAggregator.StateBuilder<State>> builders) {
255 final List<DataTreeCandidate> changes = new ArrayList<>();
256 final List<DOMDataTreeListeningException> failures = new ArrayList<>(0);
257 final Builder<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = ImmutableMap.builder();
258 for (AbstractStateAggregator.StateBuilder<State> builder : builders) {
259 collectState(builder.build(), changes, subtrees, failures);
262 if (!failures.isEmpty()) {
263 regs.forEach(ListenerRegistration::close);
264 FAILURE_NOTIFIER.execute(() -> listener.onDataTreeFailed(failures));
265 return new Failed<>(builders);
267 if (!changes.isEmpty()) {
268 callListener(listener, changes, subtrees.build());
271 return new Operational(builders, listener);
274 @SuppressWarnings("checkstyle:IllegalCatch")
275 static void callListener(final DOMDataTreeListener listener, final Collection<DataTreeCandidate> changes,
276 final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
278 listener.onDataTreeChanged(changes, subtrees);
279 } catch (Exception e) {
280 LOG.error("Listener {} failed to process initial changes", listener, e);
284 static void collectState(final State state, final Collection<DataTreeCandidate> changes,
285 final Builder<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees,
286 final Collection<DOMDataTreeListeningException> failures) {
287 Verify.verify(state instanceof Aggregated, "Unexpected state %s", state);
288 final Aggregated aggregated = (Aggregated) state;
290 subtrees.putAll(aggregated.subtrees);
291 changes.addAll(aggregated.changes);
292 failures.addAll(aggregated.failures);