From 5281fa941604e5234e8543bc17267ad4f540e669 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 26 Jul 2017 01:43:15 +0200 Subject: [PATCH 1/1] BUG-8733: switch to using DOMDataTreeListener-based APIs This patch switches ShardedDOMDataTree to use ListenableDOMDataTreeShard, performing adaptation and aggregation only when needed. The end result is that a DOMDataTreeListeners affected only by a single ListenableDOMDataTreeShard are passed directly to that instance, allowing for efficient event delivery. In case a registration spans multiple shards, we register a listener with each and use DOMDataTreeListenerAggregator to efficiently merge the callbacks. Change-Id: I0a879b45b2389d8def5ab824ab29dfbccc2b4f86 Signed-off-by: Robert Varga --- .../mdsal/dom/broker/ShardedDOMDataTree.java | 99 +++++++++++---- .../ShardedDOMDataTreeListenerContext.java | 118 ------------------ .../broker/ShardedDOMDataTreeProducer.java | 17 +-- 3 files changed, 79 insertions(+), 155 deletions(-) delete mode 100644 dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeListenerContext.java diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTree.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTree.java index 76df2318e8..21d160d79a 100644 --- a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTree.java +++ b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTree.java @@ -8,7 +8,11 @@ package org.opendaylight.mdsal.dom.broker; import com.google.common.base.Preconditions; +import com.google.common.base.Verify; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.google.common.collect.ListMultimap; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -25,11 +29,16 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException; import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService; import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable; import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry; +import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeListenerAggregator; +import org.opendaylight.mdsal.dom.spi.shard.ListenableDOMDataTreeShard; import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher; import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class ShardedDOMDataTree implements DOMDataTreeService, DOMDataTreeShardingService { + private static final Logger LOG = LoggerFactory.getLogger(ShardedDOMDataTree.class); @GuardedBy("this") private final DOMDataTreePrefixTable> shards = DOMDataTreePrefixTable.create(); @@ -171,40 +180,79 @@ public final class ShardedDOMDataTree implements DOMDataTreeService, DOMDataTree final Collection subtrees, final boolean allowRxMerges, final Collection producers) throws DOMDataTreeLoopException { Preconditions.checkNotNull(listener, "listener"); - Preconditions.checkArgument(!subtrees.isEmpty(), "Subtrees must not be empty."); - final ShardedDOMDataTreeListenerContext listenerContext = - ShardedDOMDataTreeListenerContext.create(listener); - try { - // FIXME: Add attachment of producers - for (final DOMDataTreeProducer producer : producers) { - Preconditions.checkArgument(producer instanceof ShardedDOMDataTreeProducer); - final ShardedDOMDataTreeProducer castedProducer = (ShardedDOMDataTreeProducer) producer; - simpleLoopCheck(subtrees, castedProducer.getSubtrees()); - // FIXME: We should also unbound listeners - castedProducer.bindToListener(listenerContext); - } - for (final DOMDataTreeIdentifier subtree : subtrees) { - final DOMDataTreeShard shard = shards.lookup(subtree).getValue().getInstance(); - // FIXME: What should we do if listener is wildcard? And shards are on per - // node basis? - Preconditions.checkArgument(shard instanceof DOMStoreTreeChangePublisher, - "Subtree %s does not point to listenable subtree.", subtree); + // Cross-check specified trees for exclusivity and eliminate duplicates, noDupSubtrees is effectively a Set + final Collection noDupSubtrees; + switch (subtrees.size()) { + case 0: + throw new IllegalArgumentException("Subtrees must not be empty."); + case 1: + noDupSubtrees = subtrees; + break; + default: + // Check subtrees for mutual inclusion, this is an O(N**2) operation + for (DOMDataTreeIdentifier toCheck : subtrees) { + for (DOMDataTreeIdentifier against : subtrees) { + if (!toCheck.equals(against)) { + Preconditions.checkArgument(!toCheck.contains(against), "Subtree %s contains subtree %s", + toCheck, against); + } + } + } - listenerContext.register(subtree, (DOMStoreTreeChangePublisher) shard); - } - } catch (final Exception e) { - listenerContext.close(); - throw e; + noDupSubtrees = ImmutableSet.copyOf(subtrees); + } + + LOG.trace("Requested registration of listener {} to subtrees {}", listener, noDupSubtrees); + + // Lookup shards corresponding to subtrees and construct a map of which subtrees we want from which shard + final ListMultimap, DOMDataTreeIdentifier> needed = + ArrayListMultimap.create(); + for (final DOMDataTreeIdentifier subtree : subtrees) { + final DOMDataTreeShardRegistration reg = Verify.verifyNotNull(shards.lookup(subtree).getValue()); + needed.put(reg, subtree); + } + + LOG.trace("Listener {} is attaching to shards {}", listener, needed); + + // Sanity check: all selected shards have to support one of the listening interfaces + needed.asMap().forEach((reg, trees) -> { + final DOMDataTreeShard shard = reg.getInstance(); + Preconditions.checkArgument(shard instanceof ListenableDOMDataTreeShard + || shard instanceof DOMStoreTreeChangePublisher, "Subtrees %s do not point to listenable subtree.", + trees); + }); + + // Sanity check: all producers have to come from this implementation and must not form loops + for (DOMDataTreeProducer producer : producers) { + Preconditions.checkArgument(producer instanceof ShardedDOMDataTreeProducer); + simpleLoopCheck(subtrees, ((ShardedDOMDataTreeProducer) producer).getSubtrees()); } + + final ListenerRegistration underlyingRegistration = createRegisteredListener(listener, needed.asMap(), + allowRxMerges, producers); return new AbstractListenerRegistration(listener) { @Override protected void removeRegistration() { - ShardedDOMDataTree.this.removeListener(listenerContext); + ShardedDOMDataTree.this.removeListener(listener); + underlyingRegistration.close(); } }; } + private static ListenerRegistration createRegisteredListener(final DOMDataTreeListener userListener, + final Map, Collection> needed, + final boolean allowRxMerges, final Collection producers) { + // FIXME: Add attachment of producers + for (final DOMDataTreeProducer producer : producers) { + // FIXME: We should also unbound listeners + ((ShardedDOMDataTreeProducer) producer).bindToListener(userListener); + } + + return DOMDataTreeListenerAggregator.aggregateIfNeeded(userListener, needed, allowRxMerges, + DOMDataTreeShardRegistration::getInstance); + } + private static void simpleLoopCheck(final Collection listen, final Set writes) throws DOMDataTreeLoopException { for (final DOMDataTreeIdentifier listenPath : listen) { @@ -222,8 +270,7 @@ public final class ShardedDOMDataTree implements DOMDataTreeService, DOMDataTree } } - void removeListener(final ShardedDOMDataTreeListenerContext listener) { + void removeListener(final DOMDataTreeListener listener) { // FIXME: detach producers - listener.close(); } } diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeListenerContext.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeListenerContext.java deleted file mode 100644 index 5713bece9b..0000000000 --- a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeListenerContext.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.mdsal.dom.broker; - -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.EnumMap; -import java.util.Map; -import javax.annotation.concurrent.GuardedBy; -import org.opendaylight.mdsal.common.api.LogicalDatastoreType; -import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.api.DOMDataTreeListener; -import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher; -import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.util.MapAdaptor; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; - -@Deprecated -class ShardedDOMDataTreeListenerContext implements AutoCloseable { - - private final DOMDataTreeListener listener; - private final EnumMap storeListeners = new EnumMap<>( - LogicalDatastoreType.class); - private final Collection> registrations = new ArrayList<>(); - - // FIXME: Probably should be encapsulated into state object - @GuardedBy("this") - private Collection unreported = new ArrayList<>(); - @GuardedBy("this") - private Map> currentData = Collections.emptyMap(); - - private ShardedDOMDataTreeListenerContext(final T listener) { - for (LogicalDatastoreType type : LogicalDatastoreType.values()) { - storeListeners.put(type, new StoreListener(type)); - } - this.listener = Preconditions.checkNotNull(listener, "listener"); - } - - static ShardedDOMDataTreeListenerContext create(final T listener) { - return new ShardedDOMDataTreeListenerContext<>(listener); - } - - synchronized void notifyListener() { - Collection changesToNotify = unreported; - unreported = new ArrayList<>(); - listener.onDataTreeChanged(changesToNotify, currentData); - } - - void register(final DOMDataTreeIdentifier subtree, final DOMStoreTreeChangePublisher shard) { - ListenerRegistration storeReg = - shard.registerTreeChangeListener(subtree.getRootIdentifier(), - storeListeners.get(subtree.getDatastoreType())); - registrations.add(storeReg); - } - - private final class StoreListener implements DOMDataTreeChangeListener { - - private final LogicalDatastoreType type; - - StoreListener(final LogicalDatastoreType type) { - this.type = type; - } - - @Override - public void onDataTreeChanged(final Collection changes) { - receivedDataTreeChanges(type, changes); - scheduleNotification(); - } - - } - - // FIXME: Should be able to run parallel to notifyListener and should honor - // allowRxMerges - synchronized void receivedDataTreeChanges(final LogicalDatastoreType type, - final Collection changes) { - Map> updatedData = - MapAdaptor.getDefaultInstance().takeSnapshot(currentData); - for (DataTreeCandidate change : changes) { - // FIXME: Make sure only one is reported / merged - unreported.add(change); - DOMDataTreeIdentifier treeId = new DOMDataTreeIdentifier(type, change.getRootPath()); - // FIXME: Probably we should apply data tree candidate to previously observed state - Optional> dataAfter = change.getRootNode().getDataAfter(); - if (dataAfter.isPresent()) { - updatedData.put(treeId, dataAfter.get()); - } else { - updatedData.remove(treeId); - } - } - currentData = MapAdaptor.getDefaultInstance().optimize(updatedData); - } - - void scheduleNotification() { - // FIXME: This callout should schedule delivery task - notifyListener(); - } - - @Override - public void close() { - for (ListenerRegistration reg : registrations) { - reg.close(); - } - } - - DOMDataTreeListener getListener() { - return listener; - } -} diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeProducer.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeProducer.java index b7c2e58e52..87f321db66 100644 --- a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeProducer.java +++ b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeProducer.java @@ -18,6 +18,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.api.DOMDataTreeListener; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerBusyException; import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException; @@ -50,7 +51,7 @@ class ShardedDOMDataTreeProducer implements DOMDataTreeProducer { AtomicIntegerFieldUpdater.newUpdater(ShardedDOMDataTreeProducer.class, "closed"); private volatile int closed; - private volatile ShardedDOMDataTreeListenerContext attachedListener; + private volatile DOMDataTreeListener attachedListener; private volatile ProducerLayout layout; private ShardedDOMDataTreeProducer(final ShardedDOMDataTree dataTree, @@ -268,15 +269,9 @@ class ShardedDOMDataTreeProducer implements DOMDataTreeProducer { } } - void bindToListener(final ShardedDOMDataTreeListenerContext listener) { - Preconditions.checkNotNull(listener); - - final ShardedDOMDataTreeListenerContext local = attachedListener; - if (local != null) { - throw new IllegalStateException(String.format("Producer %s is already attached to listener %s", this, - local.getListener())); - } - - this.attachedListener = listener; + void bindToListener(final DOMDataTreeListener listener) { + final DOMDataTreeListener local = attachedListener; + Preconditions.checkState(local == null, "Producer %s is already attached to listener %s", this, local); + this.attachedListener = Preconditions.checkNotNull(listener); } } -- 2.36.6