BUG-8733: Add ListenableDOMDataTreeShard 30/60630/26
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 21 Jul 2017 09:45:24 +0000 (11:45 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Mon, 31 Jul 2017 12:34:54 +0000 (14:34 +0200)
Implementation reliance on DOMDataTreeChangePublisher is a mistake
coming from similarities between interfaces. DOMDataTreeShard interfaces
should work with DOMDataTreeListener instances, not DOMDataTreeChangeListener.

This patch introduces ListenableDOMDataTreeShard, which exposes a proper
SPI-level method for registering DOMDataTreeListeners.

It also adds AbstractStateAggregator, which can be used to efficiently
aggregate multiple listeners into a single upcall, without the synchronization
overhead of ShardedDOMDataTreeListenerContext.

This class is then used to build DOMDataTreeChangeListenerAggregator for
bridging the old approach with ListenableDOMDataTreeShard via a utility
proxy, CompatListenableDOMDataTreeShard.

We also introduce DOMDataTreeListenerAggregator, which performs aggregation
of multiple DOMDataTreeListeners and is useful for ListenableDOMDataTreeShard
implementations where requested subtrees live in multiple child shards.

Change-Id: I979610e032605ade6d68196d51ae62778311f8c6
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeListenerContext.java
dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/shard/AbstractStateAggregator.java [new file with mode: 0644]
dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/shard/CompatDOMDataTreeListenerRegistry.java [new file with mode: 0644]
dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/shard/CompatListenableDOMDataTreeShard.java [new file with mode: 0644]
dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/shard/DOMDataTreeChangeListenerAggregator.java [new file with mode: 0644]
dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/shard/DOMDataTreeListenerAggregator.java [new file with mode: 0644]
dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/shard/DOMDataTreeListenerRegistry.java [new file with mode: 0644]
dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/shard/ListenableDOMDataTreeShard.java [new file with mode: 0644]
dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/shard/ReadableWriteableDOMDataTreeShard.java
dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/shard/WriteableDOMDataTreeShard.java

index 4efe0dc5b40276d082e7829fdd0b9d5bf312b9df..5713bece9bb86bcf03fb33ad3d5880fa3cc02c4f 100644 (file)
@@ -25,6 +25,7 @@ import org.opendaylight.yangtools.util.MapAdaptor;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
+@Deprecated
 class ShardedDOMDataTreeListenerContext<T extends DOMDataTreeListener> implements AutoCloseable {
 
     private final DOMDataTreeListener listener;
@@ -38,7 +39,7 @@ class ShardedDOMDataTreeListenerContext<T extends DOMDataTreeListener> implement
     @GuardedBy("this")
     private Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> currentData = Collections.emptyMap();
 
-    private ShardedDOMDataTreeListenerContext(T listener) {
+    private ShardedDOMDataTreeListenerContext(final T listener) {
         for (LogicalDatastoreType type : LogicalDatastoreType.values()) {
             storeListeners.put(type, new StoreListener(type));
         }
@@ -55,7 +56,7 @@ class ShardedDOMDataTreeListenerContext<T extends DOMDataTreeListener> implement
         listener.onDataTreeChanged(changesToNotify, currentData);
     }
 
-    void register(DOMDataTreeIdentifier subtree, DOMStoreTreeChangePublisher shard) {
+    void register(final DOMDataTreeIdentifier subtree, final DOMStoreTreeChangePublisher shard) {
         ListenerRegistration<?> storeReg =
                 shard.registerTreeChangeListener(subtree.getRootIdentifier(),
                         storeListeners.get(subtree.getDatastoreType()));
@@ -66,12 +67,12 @@ class ShardedDOMDataTreeListenerContext<T extends DOMDataTreeListener> implement
 
         private final LogicalDatastoreType type;
 
-        StoreListener(LogicalDatastoreType type) {
+        StoreListener(final LogicalDatastoreType type) {
             this.type = type;
         }
 
         @Override
-        public void onDataTreeChanged(Collection<DataTreeCandidate> changes) {
+        public void onDataTreeChanged(final Collection<DataTreeCandidate> changes) {
             receivedDataTreeChanges(type, changes);
             scheduleNotification();
         }
@@ -80,7 +81,8 @@ class ShardedDOMDataTreeListenerContext<T extends DOMDataTreeListener> implement
 
     // FIXME: Should be able to run parallel to notifyListener and should honor
     // allowRxMerges
-    synchronized void receivedDataTreeChanges(LogicalDatastoreType type, Collection<DataTreeCandidate> changes) {
+    synchronized void receivedDataTreeChanges(final LogicalDatastoreType type,
+            final Collection<DataTreeCandidate> changes) {
         Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> updatedData =
                 MapAdaptor.getDefaultInstance().takeSnapshot(currentData);
         for (DataTreeCandidate change : changes) {
diff --git a/dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/shard/AbstractStateAggregator.java b/dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/shard/AbstractStateAggregator.java
new file mode 100644 (file)
index 0000000..5878db9
--- /dev/null
@@ -0,0 +1,207 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.ro. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.dom.spi.shard;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.yangtools.concepts.Builder;
+import org.opendaylight.yangtools.concepts.Immutable;
+import org.opendaylight.yangtools.concepts.Mutable;
+
+/**
+ * Aggregator which combines state reported by potentially multiple threads into a single state report. State is
+ * received concurrently to reports and reporter threads are hijacked when there is state to be reported and no thread
+ * is reporting it.
+ *
+ * @param <S> State type
+ * @author Robert Varga
+ */
+@Beta
+public abstract class AbstractStateAggregator<S extends AbstractStateAggregator.State> {
+
+    /**
+     * Marker interface for state as both reported up and down.
+     */
+    public abstract static class State implements Immutable {
+
+    }
+
+    /**
+     * State aggregator, which receives state chunks and creates an aggregated state object via the build method.
+     * Note all internal state must be protected by the the lock on the builder project itself.
+     *
+     * @param <S> State type
+     */
+    protected abstract static class StateBuilder<S extends State> implements Builder<S>, Mutable {
+
+        protected abstract void append(S state);
+
+        protected abstract void appendInitial(S state);
+    }
+
+    protected abstract static class Behavior<B extends Behavior<B, S>, S extends State> {
+
+        abstract Collection<StateBuilder<S>> builders();
+
+        abstract void receiveState(StateBuilder<S> builder, S state);
+    }
+
+    private static final class Starting<S extends State> extends Behavior<Starting<S>, S> {
+        private final Collection<StateBuilder<S>> builders;
+        @GuardedBy("this")
+        private Started<S> successor;
+
+        Starting(final int sizeHint) {
+            builders = new ArrayList<>(sizeHint);
+        }
+
+        void add(final StateBuilder<S> builder) {
+            builders.add(Preconditions.checkNotNull(builder));
+        }
+
+        @Override
+        Collection<StateBuilder<S>> builders() {
+            return builders;
+        }
+
+        @Override
+        synchronized void receiveState(final StateBuilder<S> builder, final S state) {
+            if (successor != null) {
+                successor.receiveState(builder, state);
+                return;
+            }
+
+            builder.appendInitial(state);
+        }
+
+        synchronized Started<S> start(final Function<Collection<StateBuilder<S>>, Started<S>> function) {
+            Preconditions.checkState(successor == null, "Attempted to start an already-started aggregator");
+            final Started<S> next = Verify.verifyNotNull(function.apply(ImmutableList.copyOf(builders)));
+            successor = next;
+            return next;
+        }
+    }
+
+    protected abstract static class Started<S extends State> extends Behavior<Started<S>, S> {
+        private final Collection<StateBuilder<S>> builders;
+
+        Started(final Collection<? extends StateBuilder<S>> builders) {
+            this.builders = ImmutableList.copyOf(builders);
+        }
+
+        @Override
+        final Collection<StateBuilder<S>> builders() {
+            return builders;
+        }
+    }
+
+    protected static final class Failed<S extends State> extends Started<S> {
+        protected Failed(final Collection<? extends StateBuilder<S>> builders) {
+            super(builders);
+        }
+
+        @Override
+        void receiveState(final StateBuilder<S> builder, final S state) {
+            // Intentional no-op
+        }
+    }
+
+    protected abstract static class Operational<S extends State> extends Started<S> {
+        // Locking is a combination of a generation counter and a semaphore. Generation is bumped and remembered
+        // on stack when new state is being appended. Processed generations are recorded separately. This can cause
+        // false-positives when we loop on empty state, but that should not happen often and is harmless.
+        private final AtomicBoolean semaphore = new AtomicBoolean();
+        private final AtomicLong generation = new AtomicLong();
+
+        private volatile long processed;
+
+        protected Operational(final Collection<? extends StateBuilder<S>> builders) {
+            super(builders);
+        }
+
+        protected abstract void notifyListener(final Iterator<S> iterator);
+
+        @Override
+        final void receiveState(final StateBuilder<S> builder, final S state) {
+            synchronized (builder) {
+                // Generation has to be bumbed atomically with state delivery, otherwise tryNotifyListener could
+                // observe state with after generation was bumped and before the state was appended
+                final long gen = generation.incrementAndGet();
+                try {
+                    builder.append(state);
+                } finally {
+                    tryNotifyListener(gen);
+                }
+            }
+        }
+
+        private void tryNotifyListener(final long initGen) {
+            long gen = initGen;
+
+            // We now have to re-sync, as we may end up being the last thread in position to observe the complete state
+            // of the queues. Since queues are updated independently to iteration, notifyListener() may have missed
+            // some updates, in which case we must eventually run it.
+            //
+            // Check if this generation was processed by someone else (while we were inserting items) or if there is
+            // somebody else already running this loop (which means they will re-check and spin again).
+            while (gen != processed && semaphore.compareAndSet(false, true)) {
+                try {
+                    processed = gen;
+                    notifyListener(Iterators.transform(builders().iterator(), StateBuilder::build));
+                } finally {
+                    semaphore.set(false);
+                }
+
+                final long nextGen = generation.get();
+                if (nextGen == gen) {
+                    // No modifications happened, we are done
+                    return;
+                }
+
+                gen = nextGen;
+            }
+        }
+    }
+
+    private volatile Behavior<?, S> behavior;
+
+    protected AbstractStateAggregator(final int sizeHint) {
+        this.behavior = new Starting<>(sizeHint);
+    }
+
+    protected final void addBuilder(final StateBuilder<S> builder) {
+        checkStarting().add(builder);
+    }
+
+    protected final synchronized Started<S> start(final Function<Collection<StateBuilder<S>>, Started<S>> function) {
+        final Started<S> ret = checkStarting().start(function);
+        behavior = ret;
+        return ret;
+    }
+
+    protected final void receiveState(final StateBuilder<S> builder, final S state) {
+        behavior.receiveState(builder, state);
+    }
+
+    @SuppressWarnings("unchecked")
+    private Starting<S> checkStarting() {
+        final Behavior<?, S> local = behavior;
+        Preconditions.checkState(local instanceof Starting, "Unexpected behavior %s", local);
+        return (Starting<S>) local;
+    }
+}
diff --git a/dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/shard/CompatDOMDataTreeListenerRegistry.java b/dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/shard/CompatDOMDataTreeListenerRegistry.java
new file mode 100644 (file)
index 0000000..7ccf689
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.ro. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.dom.spi.shard;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import java.util.ArrayList;
+import java.util.Collection;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * Compatibility bridge between {@link DOMDataTreeListenerRegistry} and {@link DOMStoreTreeChangePublisher}.
+ */
+@Beta
+@Deprecated
+public final class CompatDOMDataTreeListenerRegistry implements DOMDataTreeListenerRegistry {
+
+    private final DOMStoreTreeChangePublisher publisher;
+
+    public CompatDOMDataTreeListenerRegistry(final DOMStoreTreeChangePublisher publisher) {
+        this.publisher = requireNonNull(publisher);
+    }
+
+    @Override
+    public <T extends DOMDataTreeListener> ListenerRegistration<T> registerListener(final T listener,
+            final Collection<DOMDataTreeIdentifier> subtrees, final boolean allowRxMerges) {
+        if (subtrees.size() == 1) {
+            final DOMDataTreeIdentifier treeId = Iterables.getOnlyElement(subtrees);
+
+            final ListenerRegistration<?> reg = publisher.registerTreeChangeListener(treeId.getRootIdentifier(),
+                changes -> {
+                    final Optional<NormalizedNode<?, ?>> last = Iterables.getLast(changes).getRootNode().getDataAfter();
+                    if (last.isPresent()) {
+                        listener.onDataTreeChanged(changes, ImmutableMap.of(treeId, last.get()));
+                    } else {
+                        listener.onDataTreeChanged(changes, ImmutableMap.of());
+                    }
+                });
+            return new AbstractListenerRegistration<T>(listener) {
+                @Override
+                protected void removeRegistration() {
+                    reg.close();
+                }
+            };
+        }
+
+        final int size = subtrees.size();
+        final Collection<ListenerRegistration<?>> regs = new ArrayList<>(size);
+        final DOMDataTreeChangeListenerAggregator aggregator = new DOMDataTreeChangeListenerAggregator(size,
+            allowRxMerges);
+        for (DOMDataTreeIdentifier treeId : subtrees) {
+            regs.add(publisher.registerTreeChangeListener(treeId.getRootIdentifier(),
+                aggregator.createListener(treeId)));
+        }
+
+        return aggregator.start(listener, regs);
+    }
+}
diff --git a/dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/shard/CompatListenableDOMDataTreeShard.java b/dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/shard/CompatListenableDOMDataTreeShard.java
new file mode 100644 (file)
index 0000000..00c7997
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.ro. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.dom.spi.shard;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.collect.ForwardingObject;
+import java.util.Collection;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Compatibility layer between {@link DOMStoreTreeChangePublisher} and {@link ListenableDOMDataTreeShard}. Required
+ * for migration purposes.
+ *
+ * @author Robert Varga
+ *
+ * @deprecated This class is scheduled for removal when we remove compatibility with dom.spi.store APIs.
+ */
+@Deprecated
+public final class CompatListenableDOMDataTreeShard extends ForwardingObject implements ListenableDOMDataTreeShard {
+    private static final Logger LOG = LoggerFactory.getLogger(CompatListenableDOMDataTreeShard.class);
+
+    private final CompatDOMDataTreeListenerRegistry publisher;
+    private final DOMDataTreeShard delegate;
+
+    private CompatListenableDOMDataTreeShard(final DOMDataTreeShard delegate) {
+        this.delegate = requireNonNull(delegate);
+        checkArgument(delegate instanceof DOMStoreTreeChangePublisher);
+        this.publisher = new CompatDOMDataTreeListenerRegistry((DOMStoreTreeChangePublisher) delegate);
+    }
+
+    public static ListenableDOMDataTreeShard createIfNeeded(final DOMDataTreeShard delegate) {
+        if (delegate instanceof ListenableDOMDataTreeShard) {
+            return (ListenableDOMDataTreeShard) delegate;
+        }
+
+        LOG.debug("Using compatibility adaptor for {}", delegate);
+        return new CompatListenableDOMDataTreeShard(delegate);
+    }
+
+    @Override
+    protected DOMDataTreeShard delegate() {
+        return delegate;
+    }
+
+    @Override
+    public void onChildAttached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
+        delegate.onChildAttached(prefix, child);
+    }
+
+    @Override
+    public void onChildDetached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
+        delegate.onChildDetached(prefix, child);
+    }
+
+    @Override
+    public <L extends DOMDataTreeListener> ListenerRegistration<L> registerListener(final L listener,
+            final Collection<DOMDataTreeIdentifier> subtrees, final boolean allowRxMerges) {
+        return publisher.registerListener(listener, subtrees, allowRxMerges);
+    }
+}
diff --git a/dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/shard/DOMDataTreeChangeListenerAggregator.java b/dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/shard/DOMDataTreeChangeListenerAggregator.java
new file mode 100644 (file)
index 0000000..97f3d28
--- /dev/null
@@ -0,0 +1,169 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.ro. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.dom.spi.shard;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A compatibility class for bridging DOMDataTreeChangeListener, which can listen on only single subtree with
+ * {@link DOMDataTreeListener} interface.
+ *
+ * @author Robert Varga
+ * @deprecated This class is scheduled for removal when we remove compatibility with dom.spi.store APIs.
+ */
+@Deprecated
+final class DOMDataTreeChangeListenerAggregator
+        extends AbstractStateAggregator<DOMDataTreeChangeListenerAggregator.State> {
+
+    static final class State extends AbstractStateAggregator.State implements Identifiable<DOMDataTreeIdentifier> {
+        private final DOMDataTreeIdentifier identifier;
+        final List<DataTreeCandidate> changes;
+
+        State(final DOMDataTreeIdentifier identifier, final List<DataTreeCandidate> changes) {
+            this.identifier = Preconditions.checkNotNull(identifier);
+            this.changes = Preconditions.checkNotNull(changes);
+        }
+
+        @Override
+        public DOMDataTreeIdentifier getIdentifier() {
+            return identifier;
+        }
+    }
+
+    private static final class StateBuilder extends AbstractStateAggregator.StateBuilder<State> {
+        @GuardedBy("this")
+        private final List<DataTreeCandidate> changes = new ArrayList<>();
+        private final DOMDataTreeIdentifier identifier;
+
+        StateBuilder(final DOMDataTreeIdentifier identifier) {
+            this.identifier = Preconditions.checkNotNull(identifier);
+        }
+
+        @Override
+        protected synchronized void append(final State state) {
+            changes.addAll(state.changes);
+        }
+
+        @Override
+        protected synchronized void appendInitial(final State state) {
+            // We are still starting up, so all we need to do is squash reported changes to an initial write event
+            final DataTreeCandidate last = Iterables.getLast(state.changes);
+            changes.clear();
+            final Optional<NormalizedNode<?, ?>> lastData = last.getRootNode().getDataAfter();
+            if (lastData.isPresent()) {
+                changes.add(DataTreeCandidates.fromNormalizedNode(last.getRootPath(), lastData.get()));
+            }
+        }
+
+        @Override
+        public synchronized State build() {
+            final State ret = new State(identifier, ImmutableList.copyOf(changes));
+            changes.clear();
+            return ret;
+        }
+    }
+
+    private static final class Operational extends AbstractStateAggregator.Operational<State> {
+        private final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = new HashMap<>();
+        private final DOMDataTreeListener listener;
+
+        Operational(final Collection<AbstractStateAggregator.StateBuilder<State>> builders,
+                final DOMDataTreeListener listener) {
+            super(builders);
+            this.listener = Preconditions.checkNotNull(listener);
+        }
+
+        @Override
+        protected void notifyListener(final Iterator<State> iterator) {
+            final Stopwatch clock = Stopwatch.createStarted();
+            final List<DataTreeCandidate> changes = new ArrayList<>();
+            while (iterator.hasNext()) {
+                final State state = iterator.next();
+                final List<DataTreeCandidate> candidates = state.changes;
+                if (!candidates.isEmpty()) {
+                    // Update current subtree snapshot based on last candidate node
+                    final DataTreeCandidateNode lastRoot = candidates.get(candidates.size() - 1).getRootNode();
+                    final Optional<NormalizedNode<?, ?>> optData = lastRoot.getDataAfter();
+                    if (optData.isPresent()) {
+                        subtrees.put(state.getIdentifier(), optData.get());
+                    } else {
+                        subtrees.remove(state.getIdentifier());
+                    }
+
+                    // Append changes
+                    changes.addAll(candidates);
+                }
+            }
+
+            final int size = changes.size();
+            if (size != 0) {
+                // Note: it is okay to leak changes, we must never leak mutable subtrees.
+                listener.onDataTreeChanged(changes, ImmutableMap.copyOf(subtrees));
+                LOG.trace("Listener {} processed {} changes in {}", listener, clock);
+            }
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(DOMDataTreeChangeListenerAggregator.class);
+
+    private final boolean allowRxMerges;
+
+    DOMDataTreeChangeListenerAggregator(final int sizeHint, final boolean allowRxMerges) {
+        super(sizeHint);
+        this.allowRxMerges = allowRxMerges;
+    }
+
+    DOMDataTreeChangeListener createListener(final DOMDataTreeIdentifier treeId) {
+        // TODO: do not ignore allowRxMerges, but rather create a dedicated subclass or something
+        final StateBuilder builder = new StateBuilder(treeId);
+        addBuilder(builder);
+
+        return changes -> receiveState(builder, new State(treeId, ImmutableList.copyOf(changes)));
+    }
+
+    <L extends DOMDataTreeListener> ListenerRegistration<L> start(final L listener,
+            final Collection<ListenerRegistration<?>> regs) {
+        start(builders -> {
+            final Operational ret = new Operational(builders, listener);
+            ret.notifyListener(Iterators.transform(builders.iterator(), AbstractStateAggregator.StateBuilder::build));
+            return ret;
+        });
+
+        return new AbstractListenerRegistration<L>(listener) {
+            @Override
+            protected void removeRegistration() {
+                regs.forEach(ListenerRegistration::close);
+            }
+        };
+    }
+}
diff --git a/dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/shard/DOMDataTreeListenerAggregator.java b/dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/shard/DOMDataTreeListenerAggregator.java
new file mode 100644 (file)
index 0000000..98b1905
--- /dev/null
@@ -0,0 +1,293 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.ro. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.dom.spi.shard;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.function.Function;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Aggregator which combines multiple disjunct {@link DOMDataTreeListener} and forwards their changes to a central
+ * listener.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class DOMDataTreeListenerAggregator
+        extends AbstractStateAggregator<DOMDataTreeListenerAggregator.State> {
+
+    abstract static class State extends AbstractStateAggregator.State {
+
+    }
+
+    private static final class Aggregated extends State {
+        final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees;
+        final Collection<DOMDataTreeListeningException> failures;
+        final Collection<DataTreeCandidate> changes;
+
+        Aggregated(final Collection<DataTreeCandidate> changes,
+            final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees,
+            final Collection<DOMDataTreeListeningException> failures) {
+            this.changes = Preconditions.checkNotNull(changes);
+            this.subtrees = Preconditions.checkNotNull(subtrees);
+            this.failures = Preconditions.checkNotNull(failures);
+        }
+    }
+
+    private static final class Changes extends State {
+        final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees;
+        final Collection<DataTreeCandidate> changes;
+
+        Changes(final Collection<DataTreeCandidate> changes,
+            final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
+            this.changes = Preconditions.checkNotNull(changes);
+            this.subtrees = Preconditions.checkNotNull(subtrees);
+        }
+    }
+
+    private static final class Failure extends State {
+        final Collection<DOMDataTreeListeningException> causes;
+
+        Failure(final Collection<DOMDataTreeListeningException> causes) {
+            this.causes = Preconditions.checkNotNull(causes);
+        }
+    }
+
+    private static final class StateBuilder extends AbstractStateAggregator.StateBuilder<State> {
+        @GuardedBy("this")
+        private final Collection<DOMDataTreeListeningException> causes = new ArrayList<>(0);
+        @GuardedBy("this")
+        private final Collection<DataTreeCandidate> changes = new ArrayList<>();
+        @GuardedBy("this")
+        private Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = ImmutableMap.of();
+
+        @Override
+        protected void append(final State state) {
+            if (state instanceof Changes) {
+                final Changes changes = (Changes) state;
+                this.changes.addAll(changes.changes);
+                subtrees = ImmutableMap.copyOf(changes.subtrees);
+            } else if (state instanceof Failure) {
+                causes.addAll(((Failure) state).causes);
+            } else {
+                throw new IllegalStateException("Unexpected state " + state);
+            }
+        }
+
+        @Override
+        protected synchronized void appendInitial(final State state) {
+            // TODO: we could index and compress state changes here
+            if (state instanceof Changes) {
+                final Changes changes = (Changes) state;
+                this.changes.addAll(changes.changes);
+                subtrees = ImmutableMap.copyOf(changes.subtrees);
+            } else if (state instanceof Failure) {
+                causes.addAll(((Failure) state).causes);
+            } else {
+                throw new IllegalStateException("Unexpected state " + state);
+            }
+        }
+
+        @Override
+        public synchronized Aggregated build() {
+            final Aggregated ret = new Aggregated(ImmutableList.copyOf(changes), subtrees,
+                ImmutableList.copyOf(causes));
+            changes.clear();
+            causes.clear();
+            return ret;
+        }
+    }
+
+    private static final class Operational extends AbstractStateAggregator.Operational<State> {
+        private final DOMDataTreeListener listener;
+        private boolean failed;
+
+        Operational(final Collection<AbstractStateAggregator.StateBuilder<State>> builders,
+                final DOMDataTreeListener listener) {
+            super(builders);
+            this.listener = Preconditions.checkNotNull(listener);
+        }
+
+        @Override
+        protected void notifyListener(final Iterator<State> iterator) {
+            if (failed) {
+                iterator.forEachRemaining(state -> LOG.debug("Listener {} failed, ignoring state {}", state));
+                return;
+            }
+
+            final Stopwatch clock = Stopwatch.createStarted();
+            final List<DataTreeCandidate> changes = new ArrayList<>();
+            final List<DOMDataTreeListeningException> failures = new ArrayList<>(0);
+            final Builder<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = ImmutableMap.builder();
+            while (iterator.hasNext()) {
+                collectState(iterator.next(), changes, subtrees, failures);
+            }
+
+            if (!changes.isEmpty()) {
+                // Note: it is okay to leak changes, we must never leak mutable subtrees.
+                callListener(listener, changes, subtrees.build());
+            }
+            if (!failures.isEmpty()) {
+                failed = true;
+                listener.onDataTreeFailed(failures);
+            }
+
+            LOG.trace("Listener {} notification completed in {}", listener, clock);
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(DOMDataTreeListenerAggregator.class);
+
+    // Because a component listener may report a failure before we finish registering all listeners, we need a way
+    // to trigger a failure report from the thread *not* performing the registration.
+    private static final Executor FAILURE_NOTIFIER;
+
+    static {
+        final ThreadFactoryBuilder tfb = new ThreadFactoryBuilder().setDaemon(true)
+                .setNameFormat(DOMDataTreeListenerAggregator.class.getSimpleName() + "-failure-%s");
+        FAILURE_NOTIFIER = Executors.newSingleThreadExecutor(tfb.build());
+    }
+
+    private final boolean allowRxMerges;
+
+    public DOMDataTreeListenerAggregator(final int sizeHint, final boolean allowRxMerges) {
+        super(sizeHint);
+        this.allowRxMerges = allowRxMerges;
+    }
+
+    public static <L extends DOMDataTreeListener, T> ListenerRegistration<L> aggregateIfNeeded(final L listener,
+            final Map<T, Collection<DOMDataTreeIdentifier>> subtrees, final boolean allowRxMerges,
+            final Function<T, DOMDataTreeShard> keyToShard) {
+        if (subtrees.size() == 1) {
+            final Entry<T, Collection<DOMDataTreeIdentifier>> entry = subtrees.entrySet().iterator()
+                    .next();
+            return CompatListenableDOMDataTreeShard.createIfNeeded(keyToShard.apply(entry.getKey()))
+                    .registerListener(listener, entry.getValue(), allowRxMerges);
+        }
+
+        // Alright, this the real deal, we have to aggregate.
+        final int size = subtrees.size();
+        final DOMDataTreeListenerAggregator aggregator = new DOMDataTreeListenerAggregator(size, allowRxMerges);
+        final Collection<ListenerRegistration<DOMDataTreeListener>> regs = new ArrayList<>(size);
+        for (Entry<T, Collection<DOMDataTreeIdentifier>> entry : subtrees.entrySet()) {
+            regs.add(CompatListenableDOMDataTreeShard.createIfNeeded(keyToShard.apply(entry.getKey()))
+                .registerListener(aggregator.createListener(), entry.getValue(), allowRxMerges));
+        }
+
+        return aggregator.start(listener, regs);
+    }
+
+    public DOMDataTreeListener createListener() {
+        // TODO: do not ignore allowRxMerges, but rather create a dedicated subclass or something
+        final StateBuilder builder = new StateBuilder();
+        addBuilder(builder);
+
+        return new DOMDataTreeListener() {
+            @Override
+            public void onDataTreeFailed(final Collection<DOMDataTreeListeningException> causes) {
+                receiveState(builder, new Failure(causes));
+            }
+
+            @Override
+            public void onDataTreeChanged(final Collection<DataTreeCandidate> changes,
+                    final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
+                receiveState(builder, new Changes(changes, subtrees));
+            }
+        };
+    }
+
+    public <L extends DOMDataTreeListener> ListenerRegistration<L> start(final L listener,
+            final Collection<ListenerRegistration<DOMDataTreeListener>> regs) {
+
+        final Started<State> result = start(builders -> start(listener, regs, builders));
+        if (result instanceof Failed) {
+            return new AbstractListenerRegistration<L>(listener) {
+                @Override
+                protected void removeRegistration() {
+                    // Listeners have already been closed, this is a no-op
+                }
+            };
+        }
+
+        return new AbstractListenerRegistration<L>(listener) {
+            @Override
+            protected void removeRegistration() {
+                regs.forEach(ListenerRegistration::close);
+            }
+        };
+    }
+
+    static Started<State> start(final DOMDataTreeListener listener,
+            final Collection<ListenerRegistration<DOMDataTreeListener>> regs,
+            final Collection<AbstractStateAggregator.StateBuilder<State>> builders) {
+
+        final List<DataTreeCandidate> changes = new ArrayList<>();
+        final List<DOMDataTreeListeningException> failures = new ArrayList<>(0);
+        final Builder<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = ImmutableMap.builder();
+        for (AbstractStateAggregator.StateBuilder<State> builder : builders) {
+            collectState(builder.build(), changes, subtrees, failures);
+        }
+
+        if (!failures.isEmpty()) {
+            regs.forEach(ListenerRegistration::close);
+            FAILURE_NOTIFIER.execute(() -> listener.onDataTreeFailed(failures));
+            return new Failed<>(builders);
+        }
+        if (!changes.isEmpty()) {
+            callListener(listener, changes, subtrees.build());
+        }
+
+        return new Operational(builders, listener);
+    }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    static void callListener(final DOMDataTreeListener listener, final Collection<DataTreeCandidate> changes,
+            final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
+        try {
+            listener.onDataTreeChanged(changes, subtrees);
+        } catch (Exception e) {
+            LOG.error("Listener {} failed to process initial changes", listener, e);
+        }
+    }
+
+    static void collectState(final State state, final Collection<DataTreeCandidate> changes,
+            final Builder<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees,
+            final Collection<DOMDataTreeListeningException> failures) {
+        Verify.verify(state instanceof Aggregated, "Unexpected state %s", state);
+        final Aggregated aggregated = (Aggregated) state;
+
+        subtrees.putAll(aggregated.subtrees);
+        changes.addAll(aggregated.changes);
+        failures.addAll(aggregated.failures);
+    }
+}
diff --git a/dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/shard/DOMDataTreeListenerRegistry.java b/dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/shard/DOMDataTreeListenerRegistry.java
new file mode 100644 (file)
index 0000000..ac11e09
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.ro. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.dom.spi.shard;
+
+import com.google.common.annotations.Beta;
+import java.util.Collection;
+import javax.annotation.Nonnull;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+/**
+ * Utility interface for objects dispatching events to {@link DOMDataTreeListener}.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public interface DOMDataTreeListenerRegistry {
+
+    @Nonnull <T extends DOMDataTreeListener> ListenerRegistration<T> registerListener(@Nonnull T listener,
+            @Nonnull Collection<DOMDataTreeIdentifier> subtrees, boolean allowRxMerges);
+}
diff --git a/dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/shard/ListenableDOMDataTreeShard.java b/dom/mdsal-dom-spi/src/main/java/org/opendaylight/mdsal/dom/spi/shard/ListenableDOMDataTreeShard.java
new file mode 100644 (file)
index 0000000..03b9cf6
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.ro. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.dom.spi.shard;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
+
+/**
+ * A {@link DOMDataTreeShard} which allows registration of listeners, allowing realization of the DOMDataTreeService's
+ * registerListener contract. Note that producer/consumer as well as the logical data store type are taken care of
+ * by the caller, hence implementations of this interface only need to take care of communicating with their subshards.
+ */
+@Beta
+public interface ListenableDOMDataTreeShard extends DOMDataTreeShard, DOMDataTreeListenerRegistry {
+
+}
index ed8a190bbd8ab11fa131557ac6008d2b258c6004..84633e962f3719cac221cb4fae15323d71c28d2e 100644 (file)
@@ -13,7 +13,10 @@ import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
 
 /**
  * Marker interface for readable/writeable DOMDataTreeShard.
+ *
+ * @deprecated Use {@link ListenableDOMDataTreeShard} instead.
  */
+@Deprecated
 @Beta
 public interface ReadableWriteableDOMDataTreeShard extends DOMStoreTreeChangePublisher, WriteableDOMDataTreeShard {
 }
index bd3bb0facfa0b4c68411cad7d8f0dab592d537b2..dd6c955d74eca2991c936c657daa07c79da19355 100644 (file)
@@ -21,6 +21,7 @@ public interface WriteableDOMDataTreeShard extends DOMDataTreeShard {
 
     /**
      * Create a producer that has the ability to write into the provided subtrees.
+     *
      * @param paths Subtrees that the caller wants to write into.
      * @return Producer.
      */