2 * Copyright (c) 2017 Pantheon Technologies, s.ro. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.dom.spi.shard;
10 import com.google.common.annotations.Beta;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Stopwatch;
13 import com.google.common.base.Verify;
14 import com.google.common.collect.ImmutableList;
15 import com.google.common.collect.ImmutableMap;
16 import com.google.common.collect.ImmutableMap.Builder;
17 import com.google.common.util.concurrent.ThreadFactoryBuilder;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.Iterator;
21 import java.util.List;
23 import java.util.Map.Entry;
24 import java.util.concurrent.Executor;
25 import java.util.concurrent.Executors;
26 import java.util.function.Function;
27 import javax.annotation.concurrent.GuardedBy;
28 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
29 import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
32 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
33 import org.opendaylight.yangtools.concepts.ListenerRegistration;
34 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
40 * Aggregator which combines multiple disjunct {@link DOMDataTreeListener} and forwards their changes to a central
43 * @author Robert Varga
46 public final class DOMDataTreeListenerAggregator
47 extends AbstractStateAggregator<DOMDataTreeListenerAggregator.State> {
49 abstract static class State extends AbstractStateAggregator.State {
53 private static final class Aggregated extends State {
54 final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees;
55 final Collection<DOMDataTreeListeningException> failures;
56 final Collection<DataTreeCandidate> changes;
58 Aggregated(final Collection<DataTreeCandidate> changes,
59 final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees,
60 final Collection<DOMDataTreeListeningException> failures) {
61 this.changes = Preconditions.checkNotNull(changes);
62 this.subtrees = Preconditions.checkNotNull(subtrees);
63 this.failures = Preconditions.checkNotNull(failures);
67 private static final class Changes extends State {
68 final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees;
69 final Collection<DataTreeCandidate> changes;
71 Changes(final Collection<DataTreeCandidate> changes,
72 final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
73 this.changes = Preconditions.checkNotNull(changes);
74 this.subtrees = Preconditions.checkNotNull(subtrees);
78 private static final class Failure extends State {
79 final Collection<DOMDataTreeListeningException> causes;
81 Failure(final Collection<DOMDataTreeListeningException> causes) {
82 this.causes = Preconditions.checkNotNull(causes);
86 private static final class StateBuilder extends AbstractStateAggregator.StateBuilder<State> {
88 private final Collection<DOMDataTreeListeningException> causes = new ArrayList<>(0);
90 private final Collection<DataTreeCandidate> changes = new ArrayList<>();
92 private Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = ImmutableMap.of();
95 protected void append(final State state) {
96 if (state instanceof Changes) {
97 final Changes changesState = (Changes) state;
98 this.changes.addAll(changesState.changes);
99 subtrees = ImmutableMap.copyOf(changesState.subtrees);
100 } else if (state instanceof Failure) {
101 causes.addAll(((Failure) state).causes);
103 throw new IllegalStateException("Unexpected state " + state);
108 protected synchronized void appendInitial(final State state) {
109 // TODO: we could index and compress state changes here
110 if (state instanceof Changes) {
111 final Changes changesState = (Changes) state;
112 this.changes.addAll(changesState.changes);
113 subtrees = ImmutableMap.copyOf(changesState.subtrees);
114 } else if (state instanceof Failure) {
115 causes.addAll(((Failure) state).causes);
117 throw new IllegalStateException("Unexpected state " + state);
122 public synchronized Aggregated build() {
123 final Aggregated ret = new Aggregated(ImmutableList.copyOf(changes), subtrees,
124 ImmutableList.copyOf(causes));
131 private static final class Operational extends AbstractStateAggregator.Operational<State> {
132 private final DOMDataTreeListener listener;
133 private boolean failed;
135 Operational(final Collection<AbstractStateAggregator.StateBuilder<State>> builders,
136 final DOMDataTreeListener listener) {
138 this.listener = Preconditions.checkNotNull(listener);
142 protected void notifyListener(final Iterator<State> iterator) {
144 iterator.forEachRemaining(state -> LOG.debug("Listener {} failed, ignoring state {}", state));
148 final Stopwatch clock = Stopwatch.createStarted();
149 final List<DataTreeCandidate> changes = new ArrayList<>();
150 final List<DOMDataTreeListeningException> failures = new ArrayList<>(0);
151 final Builder<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = ImmutableMap.builder();
152 while (iterator.hasNext()) {
153 collectState(iterator.next(), changes, subtrees, failures);
156 if (!changes.isEmpty()) {
157 // Note: it is okay to leak changes, we must never leak mutable subtrees.
158 callListener(listener, changes, subtrees.build());
160 if (!failures.isEmpty()) {
162 listener.onDataTreeFailed(failures);
165 LOG.trace("Listener {} notification completed in {}", listener, clock);
169 private static final Logger LOG = LoggerFactory.getLogger(DOMDataTreeListenerAggregator.class);
171 // Because a component listener may report a failure before we finish registering all listeners, we need a way
172 // to trigger a failure report from the thread *not* performing the registration.
173 private static final Executor FAILURE_NOTIFIER;
176 final ThreadFactoryBuilder tfb = new ThreadFactoryBuilder().setDaemon(true)
177 .setNameFormat(DOMDataTreeListenerAggregator.class.getSimpleName() + "-failure-%s");
178 FAILURE_NOTIFIER = Executors.newSingleThreadExecutor(tfb.build());
181 private final boolean allowRxMerges;
183 public DOMDataTreeListenerAggregator(final int sizeHint, final boolean allowRxMerges) {
185 this.allowRxMerges = allowRxMerges;
188 public static <L extends DOMDataTreeListener, T> ListenerRegistration<L> aggregateIfNeeded(final L listener,
189 final Map<T, Collection<DOMDataTreeIdentifier>> subtrees, final boolean allowRxMerges,
190 final Function<T, DOMDataTreeShard> keyToShard) {
191 if (subtrees.size() == 1) {
192 final Entry<T, Collection<DOMDataTreeIdentifier>> entry = subtrees.entrySet().iterator()
194 return CompatListenableDOMDataTreeShard.createIfNeeded(keyToShard.apply(entry.getKey()))
195 .registerListener(listener, entry.getValue(), allowRxMerges);
198 // Alright, this the real deal, we have to aggregate.
199 final int size = subtrees.size();
200 final DOMDataTreeListenerAggregator aggregator = new DOMDataTreeListenerAggregator(size, allowRxMerges);
201 final Collection<ListenerRegistration<DOMDataTreeListener>> regs = new ArrayList<>(size);
202 for (Entry<T, Collection<DOMDataTreeIdentifier>> entry : subtrees.entrySet()) {
203 regs.add(CompatListenableDOMDataTreeShard.createIfNeeded(keyToShard.apply(entry.getKey()))
204 .registerListener(aggregator.createListener(), entry.getValue(), allowRxMerges));
207 return aggregator.start(listener, regs);
210 public DOMDataTreeListener createListener() {
211 // TODO: do not ignore allowRxMerges, but rather create a dedicated subclass or something
212 final StateBuilder builder = new StateBuilder();
215 return new DOMDataTreeListener() {
217 public void onDataTreeFailed(final Collection<DOMDataTreeListeningException> causes) {
218 receiveState(builder, new Failure(causes));
222 public void onDataTreeChanged(final Collection<DataTreeCandidate> changes,
223 final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
224 receiveState(builder, new Changes(changes, subtrees));
229 public <L extends DOMDataTreeListener> ListenerRegistration<L> start(final L listener,
230 final Collection<ListenerRegistration<DOMDataTreeListener>> regs) {
232 final Started<State> result = start(builders -> start(listener, regs, builders));
233 if (result instanceof Failed) {
234 return new AbstractListenerRegistration<L>(listener) {
236 protected void removeRegistration() {
237 // Listeners have already been closed, this is a no-op
242 return new AbstractListenerRegistration<L>(listener) {
244 protected void removeRegistration() {
245 regs.forEach(ListenerRegistration::close);
250 static Started<State> start(final DOMDataTreeListener listener,
251 final Collection<ListenerRegistration<DOMDataTreeListener>> regs,
252 final Collection<AbstractStateAggregator.StateBuilder<State>> builders) {
254 final List<DataTreeCandidate> changes = new ArrayList<>();
255 final List<DOMDataTreeListeningException> failures = new ArrayList<>(0);
256 final Builder<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = ImmutableMap.builder();
257 for (AbstractStateAggregator.StateBuilder<State> builder : builders) {
258 collectState(builder.build(), changes, subtrees, failures);
261 if (!failures.isEmpty()) {
262 regs.forEach(ListenerRegistration::close);
263 FAILURE_NOTIFIER.execute(() -> listener.onDataTreeFailed(failures));
264 return new Failed<>(builders);
266 if (!changes.isEmpty()) {
267 callListener(listener, changes, subtrees.build());
270 return new Operational(builders, listener);
273 @SuppressWarnings("checkstyle:IllegalCatch")
274 static void callListener(final DOMDataTreeListener listener, final Collection<DataTreeCandidate> changes,
275 final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
277 listener.onDataTreeChanged(changes, subtrees);
278 } catch (Exception e) {
279 LOG.error("Listener {} failed to process initial changes", listener, e);
283 static void collectState(final State state, final Collection<DataTreeCandidate> changes,
284 final Builder<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees,
285 final Collection<DOMDataTreeListeningException> failures) {
286 Verify.verify(state instanceof Aggregated, "Unexpected state %s", state);
287 final Aggregated aggregated = (Aggregated) state;
289 subtrees.putAll(aggregated.subtrees);
290 changes.addAll(aggregated.changes);
291 failures.addAll(aggregated.failures);