BUG-8733: switch to using DOMDataTreeListener-based APIs 72/60772/20
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 25 Jul 2017 23:43:15 +0000 (01:43 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Mon, 31 Jul 2017 12:34:54 +0000 (14:34 +0200)
This patch switches ShardedDOMDataTree to use
ListenableDOMDataTreeShard, performing adaptation and aggregation
only when needed.

The end result is that a DOMDataTreeListeners affected only by
a single ListenableDOMDataTreeShard are passed directly to that
instance, allowing for efficient event delivery.

In case a registration spans multiple shards, we register a listener
with each and use DOMDataTreeListenerAggregator to efficiently
merge the callbacks.

Change-Id: I0a879b45b2389d8def5ab824ab29dfbccc2b4f86
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTree.java
dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeListenerContext.java [deleted file]
dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeProducer.java

index 76df2318e831622cef06013961c2afe77c4006f4..21d160d79a5f4e0fcf9e2bb54121c80d75d5a9f3 100644 (file)
@@ -8,7 +8,11 @@
 package org.opendaylight.mdsal.dom.broker;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.ListMultimap;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -25,11 +29,16 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService;
 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable;
 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
+import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeListenerAggregator;
+import org.opendaylight.mdsal.dom.spi.shard.ListenableDOMDataTreeShard;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public final class ShardedDOMDataTree implements DOMDataTreeService, DOMDataTreeShardingService {
+    private static final Logger LOG = LoggerFactory.getLogger(ShardedDOMDataTree.class);
 
     @GuardedBy("this")
     private final DOMDataTreePrefixTable<DOMDataTreeShardRegistration<?>> shards = DOMDataTreePrefixTable.create();
@@ -171,40 +180,79 @@ public final class ShardedDOMDataTree implements DOMDataTreeService, DOMDataTree
             final Collection<DOMDataTreeIdentifier> subtrees, final boolean allowRxMerges,
             final Collection<DOMDataTreeProducer> producers) throws DOMDataTreeLoopException {
         Preconditions.checkNotNull(listener, "listener");
-        Preconditions.checkArgument(!subtrees.isEmpty(), "Subtrees must not be empty.");
-        final ShardedDOMDataTreeListenerContext<T> listenerContext =
-                ShardedDOMDataTreeListenerContext.create(listener);
-        try {
-            // FIXME: Add attachment of producers
-            for (final DOMDataTreeProducer producer : producers) {
-                Preconditions.checkArgument(producer instanceof ShardedDOMDataTreeProducer);
-                final ShardedDOMDataTreeProducer castedProducer = (ShardedDOMDataTreeProducer) producer;
-                simpleLoopCheck(subtrees, castedProducer.getSubtrees());
-                // FIXME: We should also unbound listeners
-                castedProducer.bindToListener(listenerContext);
-            }
 
-            for (final DOMDataTreeIdentifier subtree : subtrees) {
-                final DOMDataTreeShard shard = shards.lookup(subtree).getValue().getInstance();
-                // FIXME: What should we do if listener is wildcard? And shards are on per
-                // node basis?
-                Preconditions.checkArgument(shard instanceof DOMStoreTreeChangePublisher,
-                        "Subtree %s does not point to listenable subtree.", subtree);
+        // Cross-check specified trees for exclusivity and eliminate duplicates, noDupSubtrees is effectively a Set
+        final Collection<DOMDataTreeIdentifier> noDupSubtrees;
+        switch (subtrees.size()) {
+            case 0:
+                throw new IllegalArgumentException("Subtrees must not be empty.");
+            case 1:
+                noDupSubtrees = subtrees;
+                break;
+            default:
+                // Check subtrees for mutual inclusion, this is an O(N**2) operation
+                for (DOMDataTreeIdentifier toCheck : subtrees) {
+                    for (DOMDataTreeIdentifier against : subtrees) {
+                        if (!toCheck.equals(against)) {
+                            Preconditions.checkArgument(!toCheck.contains(against), "Subtree %s contains subtree %s",
+                                toCheck, against);
+                        }
+                    }
+                }
 
-                listenerContext.register(subtree, (DOMStoreTreeChangePublisher) shard);
-            }
-        } catch (final Exception e) {
-            listenerContext.close();
-            throw e;
+                noDupSubtrees = ImmutableSet.copyOf(subtrees);
+        }
+
+        LOG.trace("Requested registration of listener {} to subtrees {}", listener, noDupSubtrees);
+
+        // Lookup shards corresponding to subtrees and construct a map of which subtrees we want from which shard
+        final ListMultimap<DOMDataTreeShardRegistration<?>, DOMDataTreeIdentifier> needed =
+                ArrayListMultimap.create();
+        for (final DOMDataTreeIdentifier subtree : subtrees) {
+            final DOMDataTreeShardRegistration<?> reg = Verify.verifyNotNull(shards.lookup(subtree).getValue());
+            needed.put(reg, subtree);
+        }
+
+        LOG.trace("Listener {} is attaching to shards {}", listener, needed);
+
+        // Sanity check: all selected shards have to support one of the listening interfaces
+        needed.asMap().forEach((reg, trees) -> {
+            final DOMDataTreeShard shard = reg.getInstance();
+            Preconditions.checkArgument(shard instanceof ListenableDOMDataTreeShard
+                || shard instanceof DOMStoreTreeChangePublisher, "Subtrees %s do not point to listenable subtree.",
+                trees);
+        });
+
+        // Sanity check: all producers have to come from this implementation and must not form loops
+        for (DOMDataTreeProducer producer : producers) {
+            Preconditions.checkArgument(producer instanceof ShardedDOMDataTreeProducer);
+            simpleLoopCheck(subtrees, ((ShardedDOMDataTreeProducer) producer).getSubtrees());
         }
+
+        final ListenerRegistration<?> underlyingRegistration = createRegisteredListener(listener, needed.asMap(),
+            allowRxMerges, producers);
         return new AbstractListenerRegistration<T>(listener) {
             @Override
             protected void removeRegistration() {
-                ShardedDOMDataTree.this.removeListener(listenerContext);
+                ShardedDOMDataTree.this.removeListener(listener);
+                underlyingRegistration.close();
             }
         };
     }
 
+    private static ListenerRegistration<?> createRegisteredListener(final DOMDataTreeListener userListener,
+            final Map<DOMDataTreeShardRegistration<?>, Collection<DOMDataTreeIdentifier>> needed,
+            final boolean allowRxMerges, final Collection<DOMDataTreeProducer> producers) {
+        // FIXME: Add attachment of producers
+        for (final DOMDataTreeProducer producer : producers) {
+            // FIXME: We should also unbound listeners
+            ((ShardedDOMDataTreeProducer) producer).bindToListener(userListener);
+        }
+
+        return DOMDataTreeListenerAggregator.aggregateIfNeeded(userListener, needed, allowRxMerges,
+            DOMDataTreeShardRegistration::getInstance);
+    }
+
     private static void simpleLoopCheck(final Collection<DOMDataTreeIdentifier> listen,
             final Set<DOMDataTreeIdentifier> writes) throws DOMDataTreeLoopException {
         for (final DOMDataTreeIdentifier listenPath : listen) {
@@ -222,8 +270,7 @@ public final class ShardedDOMDataTree implements DOMDataTreeService, DOMDataTree
         }
     }
 
-    void removeListener(final ShardedDOMDataTreeListenerContext<?> listener) {
+    void removeListener(final DOMDataTreeListener listener) {
         // FIXME: detach producers
-        listener.close();
     }
 }
diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeListenerContext.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeListenerContext.java
deleted file mode 100644 (file)
index 5713bec..0000000
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.mdsal.dom.broker;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.Map;
-import javax.annotation.concurrent.GuardedBy;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.util.MapAdaptor;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-
-@Deprecated
-class ShardedDOMDataTreeListenerContext<T extends DOMDataTreeListener> implements AutoCloseable {
-
-    private final DOMDataTreeListener listener;
-    private final EnumMap<LogicalDatastoreType, StoreListener> storeListeners = new EnumMap<>(
-            LogicalDatastoreType.class);
-    private final Collection<ListenerRegistration<?>> registrations = new ArrayList<>();
-
-    // FIXME: Probably should be encapsulated into state object
-    @GuardedBy("this")
-    private Collection<DataTreeCandidate> unreported = new ArrayList<>();
-    @GuardedBy("this")
-    private Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> currentData = Collections.emptyMap();
-
-    private ShardedDOMDataTreeListenerContext(final T listener) {
-        for (LogicalDatastoreType type : LogicalDatastoreType.values()) {
-            storeListeners.put(type, new StoreListener(type));
-        }
-        this.listener = Preconditions.checkNotNull(listener, "listener");
-    }
-
-    static <T extends DOMDataTreeListener> ShardedDOMDataTreeListenerContext<T> create(final T listener) {
-        return new ShardedDOMDataTreeListenerContext<>(listener);
-    }
-
-    synchronized void notifyListener() {
-        Collection<DataTreeCandidate> changesToNotify = unreported;
-        unreported = new ArrayList<>();
-        listener.onDataTreeChanged(changesToNotify, currentData);
-    }
-
-    void register(final DOMDataTreeIdentifier subtree, final DOMStoreTreeChangePublisher shard) {
-        ListenerRegistration<?> storeReg =
-                shard.registerTreeChangeListener(subtree.getRootIdentifier(),
-                        storeListeners.get(subtree.getDatastoreType()));
-        registrations.add(storeReg);
-    }
-
-    private final class StoreListener implements DOMDataTreeChangeListener {
-
-        private final LogicalDatastoreType type;
-
-        StoreListener(final LogicalDatastoreType type) {
-            this.type = type;
-        }
-
-        @Override
-        public void onDataTreeChanged(final Collection<DataTreeCandidate> changes) {
-            receivedDataTreeChanges(type, changes);
-            scheduleNotification();
-        }
-
-    }
-
-    // FIXME: Should be able to run parallel to notifyListener and should honor
-    // allowRxMerges
-    synchronized void receivedDataTreeChanges(final LogicalDatastoreType type,
-            final Collection<DataTreeCandidate> changes) {
-        Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> updatedData =
-                MapAdaptor.getDefaultInstance().takeSnapshot(currentData);
-        for (DataTreeCandidate change : changes) {
-            // FIXME: Make sure only one is reported / merged
-            unreported.add(change);
-            DOMDataTreeIdentifier treeId = new DOMDataTreeIdentifier(type, change.getRootPath());
-            // FIXME: Probably we should apply data tree candidate to previously observed state
-            Optional<NormalizedNode<?, ?>> dataAfter = change.getRootNode().getDataAfter();
-            if (dataAfter.isPresent()) {
-                updatedData.put(treeId, dataAfter.get());
-            } else {
-                updatedData.remove(treeId);
-            }
-        }
-        currentData = MapAdaptor.getDefaultInstance().optimize(updatedData);
-    }
-
-    void scheduleNotification() {
-        // FIXME: This callout should schedule delivery task
-        notifyListener();
-    }
-
-    @Override
-    public void close() {
-        for (ListenerRegistration<?> reg : registrations) {
-            reg.close();
-        }
-    }
-
-    DOMDataTreeListener getListener() {
-        return listener;
-    }
-}
index b7c2e58e523a653a1135bf5be87ceb9f36b44c16..87f321db663c9b203f507e086a7ce7a874502ab2 100644 (file)
@@ -18,6 +18,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerBusyException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
@@ -50,7 +51,7 @@ class ShardedDOMDataTreeProducer implements DOMDataTreeProducer {
             AtomicIntegerFieldUpdater.newUpdater(ShardedDOMDataTreeProducer.class, "closed");
     private volatile int closed;
 
-    private volatile ShardedDOMDataTreeListenerContext<?> attachedListener;
+    private volatile DOMDataTreeListener attachedListener;
     private volatile ProducerLayout layout;
 
     private ShardedDOMDataTreeProducer(final ShardedDOMDataTree dataTree,
@@ -268,15 +269,9 @@ class ShardedDOMDataTreeProducer implements DOMDataTreeProducer {
         }
     }
 
-    void bindToListener(final ShardedDOMDataTreeListenerContext<?> listener) {
-        Preconditions.checkNotNull(listener);
-
-        final ShardedDOMDataTreeListenerContext<?> local = attachedListener;
-        if (local != null) {
-            throw new IllegalStateException(String.format("Producer %s is already attached to listener %s", this,
-                local.getListener()));
-        }
-
-        this.attachedListener = listener;
+    void bindToListener(final DOMDataTreeListener listener) {
+        final DOMDataTreeListener local = attachedListener;
+        Preconditions.checkState(local == null, "Producer %s is already attached to listener %s", this, local);
+        this.attachedListener = Preconditions.checkNotNull(listener);
     }
 }