import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+@Deprecated
class ShardedDOMDataTreeListenerContext<T extends DOMDataTreeListener> implements AutoCloseable {
private final DOMDataTreeListener listener;
@GuardedBy("this")
private Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> currentData = Collections.emptyMap();
- private ShardedDOMDataTreeListenerContext(T listener) {
+ private ShardedDOMDataTreeListenerContext(final T listener) {
for (LogicalDatastoreType type : LogicalDatastoreType.values()) {
storeListeners.put(type, new StoreListener(type));
}
listener.onDataTreeChanged(changesToNotify, currentData);
}
- void register(DOMDataTreeIdentifier subtree, DOMStoreTreeChangePublisher shard) {
+ void register(final DOMDataTreeIdentifier subtree, final DOMStoreTreeChangePublisher shard) {
ListenerRegistration<?> storeReg =
shard.registerTreeChangeListener(subtree.getRootIdentifier(),
storeListeners.get(subtree.getDatastoreType()));
private final LogicalDatastoreType type;
- StoreListener(LogicalDatastoreType type) {
+ StoreListener(final LogicalDatastoreType type) {
this.type = type;
}
@Override
- public void onDataTreeChanged(Collection<DataTreeCandidate> changes) {
+ public void onDataTreeChanged(final Collection<DataTreeCandidate> changes) {
receivedDataTreeChanges(type, changes);
scheduleNotification();
}
// FIXME: Should be able to run parallel to notifyListener and should honor
// allowRxMerges
- synchronized void receivedDataTreeChanges(LogicalDatastoreType type, Collection<DataTreeCandidate> changes) {
+ synchronized void receivedDataTreeChanges(final LogicalDatastoreType type,
+ final Collection<DataTreeCandidate> changes) {
Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> updatedData =
MapAdaptor.getDefaultInstance().takeSnapshot(currentData);
for (DataTreeCandidate change : changes) {
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.ro. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.dom.spi.shard;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.yangtools.concepts.Builder;
+import org.opendaylight.yangtools.concepts.Immutable;
+import org.opendaylight.yangtools.concepts.Mutable;
+
+/**
+ * Aggregator which combines state reported by potentially multiple threads into a single state report. State is
+ * received concurrently to reports and reporter threads are hijacked when there is state to be reported and no thread
+ * is reporting it.
+ *
+ * @param <S> State type
+ * @author Robert Varga
+ */
+@Beta
+public abstract class AbstractStateAggregator<S extends AbstractStateAggregator.State> {
+
+ /**
+ * Marker interface for state as both reported up and down.
+ */
+ public abstract static class State implements Immutable {
+
+ }
+
+ /**
+ * State aggregator, which receives state chunks and creates an aggregated state object via the build method.
+ * Note all internal state must be protected by the the lock on the builder project itself.
+ *
+ * @param <S> State type
+ */
+ protected abstract static class StateBuilder<S extends State> implements Builder<S>, Mutable {
+
+ protected abstract void append(S state);
+
+ protected abstract void appendInitial(S state);
+ }
+
+ protected abstract static class Behavior<B extends Behavior<B, S>, S extends State> {
+
+ abstract Collection<StateBuilder<S>> builders();
+
+ abstract void receiveState(StateBuilder<S> builder, S state);
+ }
+
+ private static final class Starting<S extends State> extends Behavior<Starting<S>, S> {
+ private final Collection<StateBuilder<S>> builders;
+ @GuardedBy("this")
+ private Started<S> successor;
+
+ Starting(final int sizeHint) {
+ builders = new ArrayList<>(sizeHint);
+ }
+
+ void add(final StateBuilder<S> builder) {
+ builders.add(Preconditions.checkNotNull(builder));
+ }
+
+ @Override
+ Collection<StateBuilder<S>> builders() {
+ return builders;
+ }
+
+ @Override
+ synchronized void receiveState(final StateBuilder<S> builder, final S state) {
+ if (successor != null) {
+ successor.receiveState(builder, state);
+ return;
+ }
+
+ builder.appendInitial(state);
+ }
+
+ synchronized Started<S> start(final Function<Collection<StateBuilder<S>>, Started<S>> function) {
+ Preconditions.checkState(successor == null, "Attempted to start an already-started aggregator");
+ final Started<S> next = Verify.verifyNotNull(function.apply(ImmutableList.copyOf(builders)));
+ successor = next;
+ return next;
+ }
+ }
+
+ protected abstract static class Started<S extends State> extends Behavior<Started<S>, S> {
+ private final Collection<StateBuilder<S>> builders;
+
+ Started(final Collection<? extends StateBuilder<S>> builders) {
+ this.builders = ImmutableList.copyOf(builders);
+ }
+
+ @Override
+ final Collection<StateBuilder<S>> builders() {
+ return builders;
+ }
+ }
+
+ protected static final class Failed<S extends State> extends Started<S> {
+ protected Failed(final Collection<? extends StateBuilder<S>> builders) {
+ super(builders);
+ }
+
+ @Override
+ void receiveState(final StateBuilder<S> builder, final S state) {
+ // Intentional no-op
+ }
+ }
+
+ protected abstract static class Operational<S extends State> extends Started<S> {
+ // Locking is a combination of a generation counter and a semaphore. Generation is bumped and remembered
+ // on stack when new state is being appended. Processed generations are recorded separately. This can cause
+ // false-positives when we loop on empty state, but that should not happen often and is harmless.
+ private final AtomicBoolean semaphore = new AtomicBoolean();
+ private final AtomicLong generation = new AtomicLong();
+
+ private volatile long processed;
+
+ protected Operational(final Collection<? extends StateBuilder<S>> builders) {
+ super(builders);
+ }
+
+ protected abstract void notifyListener(Iterator<S> iterator);
+
+ @Override
+ final void receiveState(final StateBuilder<S> builder, final S state) {
+ synchronized (builder) {
+ // Generation has to be bumbed atomically with state delivery, otherwise tryNotifyListener could
+ // observe state with after generation was bumped and before the state was appended
+ final long gen = generation.incrementAndGet();
+ try {
+ builder.append(state);
+ } finally {
+ tryNotifyListener(gen);
+ }
+ }
+ }
+
+ private void tryNotifyListener(final long initGen) {
+ long gen = initGen;
+
+ // We now have to re-sync, as we may end up being the last thread in position to observe the complete state
+ // of the queues. Since queues are updated independently to iteration, notifyListener() may have missed
+ // some updates, in which case we must eventually run it.
+ //
+ // Check if this generation was processed by someone else (while we were inserting items) or if there is
+ // somebody else already running this loop (which means they will re-check and spin again).
+ while (gen != processed && semaphore.compareAndSet(false, true)) {
+ try {
+ processed = gen;
+ notifyListener(Iterators.transform(builders().iterator(), StateBuilder::build));
+ } finally {
+ semaphore.set(false);
+ }
+
+ final long nextGen = generation.get();
+ if (nextGen == gen) {
+ // No modifications happened, we are done
+ return;
+ }
+
+ gen = nextGen;
+ }
+ }
+ }
+
+ private volatile Behavior<?, S> behavior;
+
+ protected AbstractStateAggregator(final int sizeHint) {
+ this.behavior = new Starting<>(sizeHint);
+ }
+
+ protected final void addBuilder(final StateBuilder<S> builder) {
+ checkStarting().add(builder);
+ }
+
+ protected final synchronized Started<S> start(final Function<Collection<StateBuilder<S>>, Started<S>> function) {
+ final Started<S> ret = checkStarting().start(function);
+ behavior = ret;
+ return ret;
+ }
+
+ protected final void receiveState(final StateBuilder<S> builder, final S state) {
+ behavior.receiveState(builder, state);
+ }
+
+ @SuppressWarnings("unchecked")
+ private Starting<S> checkStarting() {
+ final Behavior<?, S> local = behavior;
+ Preconditions.checkState(local instanceof Starting, "Unexpected behavior %s", local);
+ return (Starting<S>) local;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.ro. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.dom.spi.shard;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import java.util.ArrayList;
+import java.util.Collection;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * Compatibility bridge between {@link DOMDataTreeListenerRegistry} and {@link DOMStoreTreeChangePublisher}.
+ */
+@Beta
+@Deprecated
+public final class CompatDOMDataTreeListenerRegistry implements DOMDataTreeListenerRegistry {
+
+ private final DOMStoreTreeChangePublisher publisher;
+
+ public CompatDOMDataTreeListenerRegistry(final DOMStoreTreeChangePublisher publisher) {
+ this.publisher = requireNonNull(publisher);
+ }
+
+ @Override
+ public <T extends DOMDataTreeListener> ListenerRegistration<T> registerListener(final T listener,
+ final Collection<DOMDataTreeIdentifier> subtrees, final boolean allowRxMerges) {
+ if (subtrees.size() == 1) {
+ final DOMDataTreeIdentifier treeId = Iterables.getOnlyElement(subtrees);
+
+ final ListenerRegistration<?> reg = publisher.registerTreeChangeListener(treeId.getRootIdentifier(),
+ changes -> {
+ final Optional<NormalizedNode<?, ?>> last = Iterables.getLast(changes).getRootNode().getDataAfter();
+ if (last.isPresent()) {
+ listener.onDataTreeChanged(changes, ImmutableMap.of(treeId, last.get()));
+ } else {
+ listener.onDataTreeChanged(changes, ImmutableMap.of());
+ }
+ });
+ return new AbstractListenerRegistration<T>(listener) {
+ @Override
+ protected void removeRegistration() {
+ reg.close();
+ }
+ };
+ }
+
+ final int size = subtrees.size();
+ final Collection<ListenerRegistration<?>> regs = new ArrayList<>(size);
+ final DOMDataTreeChangeListenerAggregator aggregator = new DOMDataTreeChangeListenerAggregator(size,
+ allowRxMerges);
+ for (DOMDataTreeIdentifier treeId : subtrees) {
+ regs.add(publisher.registerTreeChangeListener(treeId.getRootIdentifier(),
+ aggregator.createListener(treeId)));
+ }
+
+ return aggregator.start(listener, regs);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.ro. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.dom.spi.shard;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.collect.ForwardingObject;
+import java.util.Collection;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Compatibility layer between {@link DOMStoreTreeChangePublisher} and {@link ListenableDOMDataTreeShard}. Required
+ * for migration purposes.
+ *
+ * @author Robert Varga
+ *
+ * @deprecated This class is scheduled for removal when we remove compatibility with dom.spi.store APIs.
+ */
+@Deprecated
+public final class CompatListenableDOMDataTreeShard extends ForwardingObject implements ListenableDOMDataTreeShard {
+ private static final Logger LOG = LoggerFactory.getLogger(CompatListenableDOMDataTreeShard.class);
+
+ private final CompatDOMDataTreeListenerRegistry publisher;
+ private final DOMDataTreeShard delegate;
+
+ private CompatListenableDOMDataTreeShard(final DOMDataTreeShard delegate) {
+ this.delegate = requireNonNull(delegate);
+ checkArgument(delegate instanceof DOMStoreTreeChangePublisher);
+ this.publisher = new CompatDOMDataTreeListenerRegistry((DOMStoreTreeChangePublisher) delegate);
+ }
+
+ public static ListenableDOMDataTreeShard createIfNeeded(final DOMDataTreeShard delegate) {
+ if (delegate instanceof ListenableDOMDataTreeShard) {
+ return (ListenableDOMDataTreeShard) delegate;
+ }
+
+ LOG.debug("Using compatibility adaptor for {}", delegate);
+ return new CompatListenableDOMDataTreeShard(delegate);
+ }
+
+ @Override
+ protected DOMDataTreeShard delegate() {
+ return delegate;
+ }
+
+ @Override
+ public void onChildAttached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
+ delegate.onChildAttached(prefix, child);
+ }
+
+ @Override
+ public void onChildDetached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
+ delegate.onChildDetached(prefix, child);
+ }
+
+ @Override
+ public <L extends DOMDataTreeListener> ListenerRegistration<L> registerListener(final L listener,
+ final Collection<DOMDataTreeIdentifier> subtrees, final boolean allowRxMerges) {
+ return publisher.registerListener(listener, subtrees, allowRxMerges);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.ro. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.dom.spi.shard;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A compatibility class for bridging DOMDataTreeChangeListener, which can listen on only single subtree with
+ * {@link DOMDataTreeListener} interface.
+ *
+ * @author Robert Varga
+ * @deprecated This class is scheduled for removal when we remove compatibility with dom.spi.store APIs.
+ */
+@Deprecated
+final class DOMDataTreeChangeListenerAggregator
+ extends AbstractStateAggregator<DOMDataTreeChangeListenerAggregator.State> {
+
+ static final class State extends AbstractStateAggregator.State implements Identifiable<DOMDataTreeIdentifier> {
+ private final DOMDataTreeIdentifier identifier;
+ final List<DataTreeCandidate> changes;
+
+ State(final DOMDataTreeIdentifier identifier, final List<DataTreeCandidate> changes) {
+ this.identifier = Preconditions.checkNotNull(identifier);
+ this.changes = Preconditions.checkNotNull(changes);
+ }
+
+ @Override
+ public DOMDataTreeIdentifier getIdentifier() {
+ return identifier;
+ }
+ }
+
+ private static final class StateBuilder extends AbstractStateAggregator.StateBuilder<State> {
+ @GuardedBy("this")
+ private final List<DataTreeCandidate> changes = new ArrayList<>();
+ private final DOMDataTreeIdentifier identifier;
+
+ StateBuilder(final DOMDataTreeIdentifier identifier) {
+ this.identifier = Preconditions.checkNotNull(identifier);
+ }
+
+ @Override
+ protected synchronized void append(final State state) {
+ changes.addAll(state.changes);
+ }
+
+ @Override
+ protected synchronized void appendInitial(final State state) {
+ // We are still starting up, so all we need to do is squash reported changes to an initial write event
+ final DataTreeCandidate last = Iterables.getLast(state.changes);
+ changes.clear();
+ final Optional<NormalizedNode<?, ?>> lastData = last.getRootNode().getDataAfter();
+ if (lastData.isPresent()) {
+ changes.add(DataTreeCandidates.fromNormalizedNode(last.getRootPath(), lastData.get()));
+ }
+ }
+
+ @Override
+ public synchronized State build() {
+ final State ret = new State(identifier, ImmutableList.copyOf(changes));
+ changes.clear();
+ return ret;
+ }
+ }
+
+ private static final class Operational extends AbstractStateAggregator.Operational<State> {
+ private final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = new HashMap<>();
+ private final DOMDataTreeListener listener;
+
+ Operational(final Collection<AbstractStateAggregator.StateBuilder<State>> builders,
+ final DOMDataTreeListener listener) {
+ super(builders);
+ this.listener = Preconditions.checkNotNull(listener);
+ }
+
+ @Override
+ protected void notifyListener(final Iterator<State> iterator) {
+ final Stopwatch clock = Stopwatch.createStarted();
+ final List<DataTreeCandidate> changes = new ArrayList<>();
+ while (iterator.hasNext()) {
+ final State state = iterator.next();
+ final List<DataTreeCandidate> candidates = state.changes;
+ if (!candidates.isEmpty()) {
+ // Update current subtree snapshot based on last candidate node
+ final DataTreeCandidateNode lastRoot = candidates.get(candidates.size() - 1).getRootNode();
+ final Optional<NormalizedNode<?, ?>> optData = lastRoot.getDataAfter();
+ if (optData.isPresent()) {
+ subtrees.put(state.getIdentifier(), optData.get());
+ } else {
+ subtrees.remove(state.getIdentifier());
+ }
+
+ // Append changes
+ changes.addAll(candidates);
+ }
+ }
+
+ final int size = changes.size();
+ if (size != 0) {
+ // Note: it is okay to leak changes, we must never leak mutable subtrees.
+ listener.onDataTreeChanged(changes, ImmutableMap.copyOf(subtrees));
+ LOG.trace("Listener {} processed {} changes in {}", listener, clock);
+ }
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(DOMDataTreeChangeListenerAggregator.class);
+
+ private final boolean allowRxMerges;
+
+ DOMDataTreeChangeListenerAggregator(final int sizeHint, final boolean allowRxMerges) {
+ super(sizeHint);
+ this.allowRxMerges = allowRxMerges;
+ }
+
+ DOMDataTreeChangeListener createListener(final DOMDataTreeIdentifier treeId) {
+ // TODO: do not ignore allowRxMerges, but rather create a dedicated subclass or something
+ final StateBuilder builder = new StateBuilder(treeId);
+ addBuilder(builder);
+
+ return changes -> receiveState(builder, new State(treeId, ImmutableList.copyOf(changes)));
+ }
+
+ <L extends DOMDataTreeListener> ListenerRegistration<L> start(final L listener,
+ final Collection<ListenerRegistration<?>> regs) {
+ start(builders -> {
+ final Operational ret = new Operational(builders, listener);
+ ret.notifyListener(Iterators.transform(builders.iterator(), AbstractStateAggregator.StateBuilder::build));
+ return ret;
+ });
+
+ return new AbstractListenerRegistration<L>(listener) {
+ @Override
+ protected void removeRegistration() {
+ regs.forEach(ListenerRegistration::close);
+ }
+ };
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.ro. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.dom.spi.shard;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.function.Function;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Aggregator which combines multiple disjunct {@link DOMDataTreeListener} and forwards their changes to a central
+ * listener.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class DOMDataTreeListenerAggregator
+ extends AbstractStateAggregator<DOMDataTreeListenerAggregator.State> {
+
+ abstract static class State extends AbstractStateAggregator.State {
+
+ }
+
+ private static final class Aggregated extends State {
+ final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees;
+ final Collection<DOMDataTreeListeningException> failures;
+ final Collection<DataTreeCandidate> changes;
+
+ Aggregated(final Collection<DataTreeCandidate> changes,
+ final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees,
+ final Collection<DOMDataTreeListeningException> failures) {
+ this.changes = Preconditions.checkNotNull(changes);
+ this.subtrees = Preconditions.checkNotNull(subtrees);
+ this.failures = Preconditions.checkNotNull(failures);
+ }
+ }
+
+ private static final class Changes extends State {
+ final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees;
+ final Collection<DataTreeCandidate> changes;
+
+ Changes(final Collection<DataTreeCandidate> changes,
+ final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
+ this.changes = Preconditions.checkNotNull(changes);
+ this.subtrees = Preconditions.checkNotNull(subtrees);
+ }
+ }
+
+ private static final class Failure extends State {
+ final Collection<DOMDataTreeListeningException> causes;
+
+ Failure(final Collection<DOMDataTreeListeningException> causes) {
+ this.causes = Preconditions.checkNotNull(causes);
+ }
+ }
+
+ private static final class StateBuilder extends AbstractStateAggregator.StateBuilder<State> {
+ @GuardedBy("this")
+ private final Collection<DOMDataTreeListeningException> causes = new ArrayList<>(0);
+ @GuardedBy("this")
+ private final Collection<DataTreeCandidate> changes = new ArrayList<>();
+ @GuardedBy("this")
+ private Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = ImmutableMap.of();
+
+ @Override
+ protected void append(final State state) {
+ if (state instanceof Changes) {
+ final Changes changes = (Changes) state;
+ this.changes.addAll(changes.changes);
+ subtrees = ImmutableMap.copyOf(changes.subtrees);
+ } else if (state instanceof Failure) {
+ causes.addAll(((Failure) state).causes);
+ } else {
+ throw new IllegalStateException("Unexpected state " + state);
+ }
+ }
+
+ @Override
+ protected synchronized void appendInitial(final State state) {
+ // TODO: we could index and compress state changes here
+ if (state instanceof Changes) {
+ final Changes changes = (Changes) state;
+ this.changes.addAll(changes.changes);
+ subtrees = ImmutableMap.copyOf(changes.subtrees);
+ } else if (state instanceof Failure) {
+ causes.addAll(((Failure) state).causes);
+ } else {
+ throw new IllegalStateException("Unexpected state " + state);
+ }
+ }
+
+ @Override
+ public synchronized Aggregated build() {
+ final Aggregated ret = new Aggregated(ImmutableList.copyOf(changes), subtrees,
+ ImmutableList.copyOf(causes));
+ changes.clear();
+ causes.clear();
+ return ret;
+ }
+ }
+
+ private static final class Operational extends AbstractStateAggregator.Operational<State> {
+ private final DOMDataTreeListener listener;
+ private boolean failed;
+
+ Operational(final Collection<AbstractStateAggregator.StateBuilder<State>> builders,
+ final DOMDataTreeListener listener) {
+ super(builders);
+ this.listener = Preconditions.checkNotNull(listener);
+ }
+
+ @Override
+ protected void notifyListener(final Iterator<State> iterator) {
+ if (failed) {
+ iterator.forEachRemaining(state -> LOG.debug("Listener {} failed, ignoring state {}", state));
+ return;
+ }
+
+ final Stopwatch clock = Stopwatch.createStarted();
+ final List<DataTreeCandidate> changes = new ArrayList<>();
+ final List<DOMDataTreeListeningException> failures = new ArrayList<>(0);
+ final Builder<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = ImmutableMap.builder();
+ while (iterator.hasNext()) {
+ collectState(iterator.next(), changes, subtrees, failures);
+ }
+
+ if (!changes.isEmpty()) {
+ // Note: it is okay to leak changes, we must never leak mutable subtrees.
+ callListener(listener, changes, subtrees.build());
+ }
+ if (!failures.isEmpty()) {
+ failed = true;
+ listener.onDataTreeFailed(failures);
+ }
+
+ LOG.trace("Listener {} notification completed in {}", listener, clock);
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(DOMDataTreeListenerAggregator.class);
+
+ // Because a component listener may report a failure before we finish registering all listeners, we need a way
+ // to trigger a failure report from the thread *not* performing the registration.
+ private static final Executor FAILURE_NOTIFIER;
+
+ static {
+ final ThreadFactoryBuilder tfb = new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat(DOMDataTreeListenerAggregator.class.getSimpleName() + "-failure-%s");
+ FAILURE_NOTIFIER = Executors.newSingleThreadExecutor(tfb.build());
+ }
+
+ private final boolean allowRxMerges;
+
+ public DOMDataTreeListenerAggregator(final int sizeHint, final boolean allowRxMerges) {
+ super(sizeHint);
+ this.allowRxMerges = allowRxMerges;
+ }
+
+ public static <L extends DOMDataTreeListener, T> ListenerRegistration<L> aggregateIfNeeded(final L listener,
+ final Map<T, Collection<DOMDataTreeIdentifier>> subtrees, final boolean allowRxMerges,
+ final Function<T, DOMDataTreeShard> keyToShard) {
+ if (subtrees.size() == 1) {
+ final Entry<T, Collection<DOMDataTreeIdentifier>> entry = subtrees.entrySet().iterator()
+ .next();
+ return CompatListenableDOMDataTreeShard.createIfNeeded(keyToShard.apply(entry.getKey()))
+ .registerListener(listener, entry.getValue(), allowRxMerges);
+ }
+
+ // Alright, this the real deal, we have to aggregate.
+ final int size = subtrees.size();
+ final DOMDataTreeListenerAggregator aggregator = new DOMDataTreeListenerAggregator(size, allowRxMerges);
+ final Collection<ListenerRegistration<DOMDataTreeListener>> regs = new ArrayList<>(size);
+ for (Entry<T, Collection<DOMDataTreeIdentifier>> entry : subtrees.entrySet()) {
+ regs.add(CompatListenableDOMDataTreeShard.createIfNeeded(keyToShard.apply(entry.getKey()))
+ .registerListener(aggregator.createListener(), entry.getValue(), allowRxMerges));
+ }
+
+ return aggregator.start(listener, regs);
+ }
+
+ public DOMDataTreeListener createListener() {
+ // TODO: do not ignore allowRxMerges, but rather create a dedicated subclass or something
+ final StateBuilder builder = new StateBuilder();
+ addBuilder(builder);
+
+ return new DOMDataTreeListener() {
+ @Override
+ public void onDataTreeFailed(final Collection<DOMDataTreeListeningException> causes) {
+ receiveState(builder, new Failure(causes));
+ }
+
+ @Override
+ public void onDataTreeChanged(final Collection<DataTreeCandidate> changes,
+ final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
+ receiveState(builder, new Changes(changes, subtrees));
+ }
+ };
+ }
+
+ public <L extends DOMDataTreeListener> ListenerRegistration<L> start(final L listener,
+ final Collection<ListenerRegistration<DOMDataTreeListener>> regs) {
+
+ final Started<State> result = start(builders -> start(listener, regs, builders));
+ if (result instanceof Failed) {
+ return new AbstractListenerRegistration<L>(listener) {
+ @Override
+ protected void removeRegistration() {
+ // Listeners have already been closed, this is a no-op
+ }
+ };
+ }
+
+ return new AbstractListenerRegistration<L>(listener) {
+ @Override
+ protected void removeRegistration() {
+ regs.forEach(ListenerRegistration::close);
+ }
+ };
+ }
+
+ static Started<State> start(final DOMDataTreeListener listener,
+ final Collection<ListenerRegistration<DOMDataTreeListener>> regs,
+ final Collection<AbstractStateAggregator.StateBuilder<State>> builders) {
+
+ final List<DataTreeCandidate> changes = new ArrayList<>();
+ final List<DOMDataTreeListeningException> failures = new ArrayList<>(0);
+ final Builder<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = ImmutableMap.builder();
+ for (AbstractStateAggregator.StateBuilder<State> builder : builders) {
+ collectState(builder.build(), changes, subtrees, failures);
+ }
+
+ if (!failures.isEmpty()) {
+ regs.forEach(ListenerRegistration::close);
+ FAILURE_NOTIFIER.execute(() -> listener.onDataTreeFailed(failures));
+ return new Failed<>(builders);
+ }
+ if (!changes.isEmpty()) {
+ callListener(listener, changes, subtrees.build());
+ }
+
+ return new Operational(builders, listener);
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ static void callListener(final DOMDataTreeListener listener, final Collection<DataTreeCandidate> changes,
+ final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
+ try {
+ listener.onDataTreeChanged(changes, subtrees);
+ } catch (Exception e) {
+ LOG.error("Listener {} failed to process initial changes", listener, e);
+ }
+ }
+
+ static void collectState(final State state, final Collection<DataTreeCandidate> changes,
+ final Builder<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees,
+ final Collection<DOMDataTreeListeningException> failures) {
+ Verify.verify(state instanceof Aggregated, "Unexpected state %s", state);
+ final Aggregated aggregated = (Aggregated) state;
+
+ subtrees.putAll(aggregated.subtrees);
+ changes.addAll(aggregated.changes);
+ failures.addAll(aggregated.failures);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.ro. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.dom.spi.shard;
+
+import com.google.common.annotations.Beta;
+import java.util.Collection;
+import javax.annotation.Nonnull;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+/**
+ * Utility interface for objects dispatching events to {@link DOMDataTreeListener}.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public interface DOMDataTreeListenerRegistry {
+
+ @Nonnull <T extends DOMDataTreeListener> ListenerRegistration<T> registerListener(@Nonnull T listener,
+ @Nonnull Collection<DOMDataTreeIdentifier> subtrees, boolean allowRxMerges);
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.ro. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.dom.spi.shard;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
+
+/**
+ * A {@link DOMDataTreeShard} which allows registration of listeners, allowing realization of the DOMDataTreeService's
+ * registerListener contract. Note that producer/consumer as well as the logical data store type are taken care of
+ * by the caller, hence implementations of this interface only need to take care of communicating with their subshards.
+ */
+@Beta
+public interface ListenableDOMDataTreeShard extends DOMDataTreeShard, DOMDataTreeListenerRegistry {
+
+}
/**
* Marker interface for readable/writeable DOMDataTreeShard.
+ *
+ * @deprecated Use {@link ListenableDOMDataTreeShard} instead.
*/
+@Deprecated
@Beta
public interface ReadableWriteableDOMDataTreeShard extends DOMStoreTreeChangePublisher, WriteableDOMDataTreeShard {
}
/**
* Create a producer that has the ability to write into the provided subtrees.
+ *
* @param paths Subtrees that the caller wants to write into.
* @return Producer.
*/