From 4312c58d37c9e1ad27d6cf228eda0ba3f7501a0c Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Thu, 10 Apr 2014 12:28:24 +0200 Subject: [PATCH] BUG-509: Encapsulate ListenerTree This patch encapsulates the ListenerTree. This provides the infrastructure needed to make listener registration/unregistration scale and work in face of asynchronous commits. Whenever a listener is about to be registered, the tree is write-locked, the apropriate structure is created and the registration is enqueued. When a registration is closed, the removeListener() callback re-takes the appropriate lock and removes the listener. When the last listener is removed from a node, and that node is unused, it is cascaded up to its parent for removal, which recursively does the same. This means that the tree is of unused nodes as soon as they are freed. The backreference to parent is kept weak, which makes sure we are able to finalize as much as we can in case of funky references lying around. Write locks are never leaked outside of the ListenerTree class, e.g. they are always released before we return to the caller. When a transaction is being committed, the ListenerTree is asked for a Walker, which is an AutoCloseable resource. A Walker is the only way how foreign code can get hold of a listener tree root node. Each walker is backed by a reference to a read-lock, e.g. listener registration/unregistration cannot occur while such a lock is held. This makes sure the commit process sees a point-in-time view of the listener tree. When a Walker is closed, the tree lock is read-unlocked. The close() method is idempotent. Change-Id: I6ea9b2e00f8ab21a5d8bd3b8c82c193c2069d683 Signed-off-by: Robert Varga --- .../store/impl/DataChangeEventResolver.java | 38 +-- .../dom/store/impl/InMemoryDOMDataStore.java | 13 +- .../impl/tree/ListenerRegistrationNode.java | 154 ------------ .../sal/dom/store/impl/tree/ListenerTree.java | 233 ++++++++++++++++++ 4 files changed, 258 insertions(+), 180 deletions(-) delete mode 100644 opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerRegistrationNode.java create mode 100644 opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerTree.java diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DataChangeEventResolver.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DataChangeEventResolver.java index f231bb5c39..df2725d020 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DataChangeEventResolver.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DataChangeEventResolver.java @@ -10,7 +10,8 @@ import java.util.Set; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder; -import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode; +import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree; +import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree.Walker; import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification; import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; @@ -28,7 +29,7 @@ public class DataChangeEventResolver { private static final DOMImmutableDataChangeEvent NO_CHANGE = builder().build(); private final ImmutableList.Builder tasks = ImmutableList.builder(); private InstanceIdentifier rootPath; - private ListenerRegistrationNode listenerRoot; + private ListenerTree listenerRoot; private NodeModification modificationRoot; private Optional beforeRoot; private Optional afterRoot; @@ -42,11 +43,11 @@ public class DataChangeEventResolver { return this; } - protected ListenerRegistrationNode getListenerRoot() { + protected ListenerTree getListenerRoot() { return listenerRoot; } - protected DataChangeEventResolver setListenerRoot(final ListenerRegistrationNode listenerRoot) { + protected DataChangeEventResolver setListenerRoot(final ListenerTree listenerRoot) { this.listenerRoot = listenerRoot; return this; } @@ -79,13 +80,16 @@ public class DataChangeEventResolver { } public Iterable resolve() { - LOG.trace("Resolving events for {}" ,modificationRoot); - resolveAnyChangeEvent(rootPath, Optional.of(listenerRoot), modificationRoot, beforeRoot, afterRoot); - return tasks.build(); + LOG.trace("Resolving events for {}", modificationRoot); + + try (final Walker w = listenerRoot.getWalker()) { + resolveAnyChangeEvent(rootPath, Optional.of(w.getRootNode()), modificationRoot, beforeRoot, afterRoot); + return tasks.build(); + } } private DOMImmutableDataChangeEvent resolveAnyChangeEvent(final InstanceIdentifier path, - final Optional listeners, final NodeModification modification, + final Optional listeners, final NodeModification modification, final Optional before, final Optional after) { // No listeners are present in listener registration subtree // no before and after state is present @@ -119,13 +123,13 @@ public class DataChangeEventResolver { * @return */ private DOMImmutableDataChangeEvent resolveCreateEvent(final InstanceIdentifier path, - final Optional listeners, final StoreMetadataNode afterState) { + final Optional listeners, final StoreMetadataNode afterState) { final NormalizedNode node = afterState.getData(); Builder builder = builder().setAfter(node).addCreated(path, node); for (StoreMetadataNode child : afterState.getChildren()) { PathArgument childId = child.getIdentifier(); - Optional childListeners = getChild(listeners, childId); + Optional childListeners = getChild(listeners, childId); InstanceIdentifier childPath = StoreUtils.append(path, childId); builder.merge(resolveCreateEvent(childPath, childListeners, child)); @@ -135,13 +139,13 @@ public class DataChangeEventResolver { } private DOMImmutableDataChangeEvent resolveDeleteEvent(final InstanceIdentifier path, - final Optional listeners, final StoreMetadataNode beforeState) { + final Optional listeners, final StoreMetadataNode beforeState) { final NormalizedNode node = beforeState.getData(); Builder builder = builder().setBefore(node).addRemoved(path, node); for (StoreMetadataNode child : beforeState.getChildren()) { PathArgument childId = child.getIdentifier(); - Optional childListeners = getChild(listeners, childId); + Optional childListeners = getChild(listeners, childId); InstanceIdentifier childPath = StoreUtils.append(path, childId); builder.merge(resolveDeleteEvent(childPath, childListeners, child)); } @@ -149,7 +153,7 @@ public class DataChangeEventResolver { } private DOMImmutableDataChangeEvent resolveSubtreeChangeEvent(final InstanceIdentifier path, - final Optional listeners, final NodeModification modification, + final Optional listeners, final NodeModification modification, final StoreMetadataNode before, final StoreMetadataNode after) { Builder one = builder().setBefore(before.getData()).setAfter(after.getData()); @@ -159,7 +163,7 @@ public class DataChangeEventResolver { for (NodeModification childMod : modification.getModifications()) { PathArgument childId = childMod.getIdentifier(); InstanceIdentifier childPath = append(path, childId); - Optional childListen = getChild(listeners, childId); + Optional childListen = getChild(listeners, childId); Optional childBefore = before.getChild(childId); Optional childAfter = after.getChild(childId); @@ -189,13 +193,13 @@ public class DataChangeEventResolver { } private DOMImmutableDataChangeEvent resolveReplacedEvent(final InstanceIdentifier path, - final Optional listeners, final NodeModification modification, + final Optional listeners, final NodeModification modification, final StoreMetadataNode before, final StoreMetadataNode after) { // FIXME Add task return builder().build(); } - private DOMImmutableDataChangeEvent addNotifyTask(final Optional listeners, final DOMImmutableDataChangeEvent event) { + private DOMImmutableDataChangeEvent addNotifyTask(final Optional listeners, final DOMImmutableDataChangeEvent event) { if (listeners.isPresent()) { final Collection> l = listeners.get().getListeners(); if (!l.isEmpty()) { @@ -206,7 +210,7 @@ public class DataChangeEventResolver { return event; } - private void addNotifyTask(final ListenerRegistrationNode listenerRegistrationNode, final DataChangeScope scope, + private void addNotifyTask(final ListenerTree.Node listenerRegistrationNode, final DataChangeScope scope, final DOMImmutableDataChangeEvent event) { Collection> potential = listenerRegistrationNode.getListeners(); if(!potential.isEmpty()) { diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java index de0c146392..a854c4806b 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java @@ -18,8 +18,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; -import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode; -import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode.DataChangeListenerRegistration; +import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree; import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType; import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification; import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode; @@ -59,7 +58,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch private final ListeningExecutorService executor; private final String name; private final AtomicLong txCounter = new AtomicLong(0); - private final ListenerRegistrationNode listenerTree; + private final ListenerTree listenerTree; private final AtomicReference snapshot; private ModificationApplyOperation operationTree; @@ -69,7 +68,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) { this.name = Preconditions.checkNotNull(name); this.executor = Preconditions.checkNotNull(executor); - this.listenerTree = ListenerRegistrationNode.createRoot(); + this.listenerTree = ListenerTree.create(); this.snapshot = new AtomicReference(DataAndMetadataSnapshot.createEmpty()); this.operationTree = new AlwaysFailOperation(); } @@ -114,12 +113,8 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch final DataChangeListenerRegistration reg; synchronized (this) { LOG.debug("{}: Registering data change listener {} for {}",name,listener,path); - ListenerRegistrationNode listenerNode = listenerTree; - for(PathArgument arg : path.getPath()) { - listenerNode = listenerNode.ensureChild(arg); - } - reg = listenerNode.registerDataChangeListener(path, listener, scope); + reg = listenerTree.registerDataChangeListener(path, listener, scope); Optional currentState = snapshot.get().read(path); if (currentState.isPresent()) { diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerRegistrationNode.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerRegistrationNode.java deleted file mode 100644 index 854c125af1..0000000000 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerRegistrationNode.java +++ /dev/null @@ -1,154 +0,0 @@ -package org.opendaylight.controller.md.sal.dom.store.impl.tree; - -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; - -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; -import org.opendaylight.yangtools.concepts.AbstractObjectRegistration; -import org.opendaylight.yangtools.concepts.Identifiable; -import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Optional; - -public class ListenerRegistrationNode implements StoreTreeNode, Identifiable { - - private static final Logger LOG = LoggerFactory.getLogger(ListenerRegistrationNode.class); - - private final ListenerRegistrationNode parent; - private final Map children; - private final PathArgument identifier; - private final HashSet> listeners; - - private ListenerRegistrationNode(final PathArgument identifier) { - this(null, identifier); - } - - private ListenerRegistrationNode(final ListenerRegistrationNode parent, final PathArgument identifier) { - this.parent = parent; - this.identifier = identifier; - children = new HashMap<>(); - listeners = new HashSet<>(); - } - - public final static ListenerRegistrationNode createRoot() { - return new ListenerRegistrationNode(null); - } - - @Override - public PathArgument getIdentifier() { - return identifier; - } - - /** - * Return the list of current listeners. Any caller wishing to use this method - * has to make sure the collection remains unchanged while it's executing. This - * means the caller has to synchronize externally both the registration and - * unregistration process. - * - * @return the list of current listeners - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - public Collection> getListeners() { - return (Collection) listeners; - } - - @Override - public synchronized Optional getChild(final PathArgument child) { - return Optional.fromNullable(children.get(child)); - } - - public synchronized ListenerRegistrationNode ensureChild(final PathArgument child) { - ListenerRegistrationNode potential = (children.get(child)); - if (potential == null) { - potential = new ListenerRegistrationNode(this, child); - children.put(child, potential); - } - return potential; - } - - /** - * Registers listener on this node. - * - * @param path Full path on which listener is registered. - * @param listener Listener - * @param scope Scope of triggering event. - * @return - */ - public synchronized >> DataChangeListenerRegistration registerDataChangeListener(final InstanceIdentifier path, - final L listener, final DataChangeScope scope) { - - DataChangeListenerRegistration listenerReg = new DataChangeListenerRegistration(path,listener, scope, this); - listeners.add(listenerReg); - LOG.debug("Listener {} registered", listener); - return listenerReg; - } - - private synchronized void removeListener(final DataChangeListenerRegistration listener) { - listeners.remove(listener); - LOG.debug("Listener {} unregistered", listener); - removeThisIfUnused(); - } - - private void removeThisIfUnused() { - if (parent != null && listeners.isEmpty() && children.isEmpty()) { - parent.removeChildIfUnused(this); - } - } - - public boolean isUnused() { - return (listeners.isEmpty() && children.isEmpty()) || areChildrenUnused(); - } - - private boolean areChildrenUnused() { - for (ListenerRegistrationNode child : children.values()) { - if (!child.isUnused()) { - return false; - } - } - return true; - } - - private void removeChildIfUnused(final ListenerRegistrationNode listenerRegistrationNode) { - // FIXME Remove unnecessary - } - - public static class DataChangeListenerRegistration>> - extends AbstractObjectRegistration implements - org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration { - - private final DataChangeScope scope; - private ListenerRegistrationNode node; - private final InstanceIdentifier path; - - public DataChangeListenerRegistration(final InstanceIdentifier path,final T listener, final DataChangeScope scope, - final ListenerRegistrationNode node) { - super(listener); - this.path = path; - this.scope = scope; - this.node = node; - } - - @Override - public DataChangeScope getScope() { - return scope; - } - - @Override - protected void removeRegistration() { - node.removeListener(this); - node = null; - } - - @Override - public InstanceIdentifier getPath() { - return path; - } - } -} diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerTree.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerTree.java new file mode 100644 index 0000000000..58a0c69bb0 --- /dev/null +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerTree.java @@ -0,0 +1,233 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.dom.store.impl.tree; + +import java.lang.ref.Reference; +import java.lang.ref.WeakReference; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import javax.annotation.concurrent.GuardedBy; + +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; +import org.opendaylight.yangtools.concepts.Identifiable; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; + +public final class ListenerTree { + private static final Logger LOG = LoggerFactory.getLogger(ListenerTree.class); + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true); + private final Node rootNode = new Node(null, null); + + private ListenerTree() { + + } + + public static ListenerTree create() { + return new ListenerTree(); + } + + /** + * Registers listener on this node. + * + * @param path Full path on which listener is registered. + * @param listener Listener + * @param scope Scope of triggering event. + * @return Listener registration + */ + public >> DataChangeListenerRegistration registerDataChangeListener(final InstanceIdentifier path, + final L listener, final DataChangeScope scope) { + + // Take the write lock + rwLock.writeLock().lock(); + + try { + Node walkNode = rootNode; + for(PathArgument arg : path.getPath()) { + walkNode = walkNode.ensureChild(arg); + } + + final Node node = walkNode; + DataChangeListenerRegistration listenerReg = new DataChangeListenerRegistration(listener) { + @Override + public DataChangeScope getScope() { + return scope; + } + + @Override + public InstanceIdentifier getPath() { + return path; + } + + @Override + protected void removeRegistration() { + /* + * TODO: Here's an interesting problem. The way the datastore works, it + * enqueues requests towards the listener, so the listener will be + * notified at some point in the future. Now if the registration is + * closed, we will prevent any new events from being delivered, but + * we have no way to purge that queue. + * + * While this does not directly violate the ListenerRegistration + * contract, it is probably not going to be liked by the users. + */ + + // Take the write lock + ListenerTree.this.rwLock.writeLock().lock(); + try { + node.removeListener(this); + } finally { + // Always release the lock + ListenerTree.this.rwLock.writeLock().unlock(); + } + } + }; + + node.addListener(listenerReg); + return listenerReg; + } finally { + // Always release the lock + rwLock.writeLock().unlock(); + } + } + + public Walker getWalker() { + /* + * TODO: The only current user of this method is local to the datastore. + * Since this class represents a read-lock, losing a reference to + * it is a _major_ problem, as the registration process will get + * wedged, eventually grinding the system to a halt. Should an + * external user exist, make the Walker a phantom reference, which + * will cleanup the lock if not told to do so. + */ + final Walker ret = new Walker(rwLock.readLock(), rootNode); + rwLock.readLock().lock(); + return ret; + } + + public static final class Walker implements AutoCloseable { + private final Lock lock; + private final Node node; + + @GuardedBy("this") + private boolean valid = true; + + private Walker(final Lock lock, final Node node) { + this.lock = Preconditions.checkNotNull(lock); + this.node = Preconditions.checkNotNull(node); + } + + public Node getRootNode() { + return node; + } + + @Override + public synchronized void close() { + if (valid) { + lock.unlock(); + valid = false; + } + } + } + + /** + * This is a single node within the listener tree. Note that the data returned from + * and instance of this class is guaranteed to have any relevance or consistency + * only as long as the {@link Walker} instance through which it is reached remains + * unclosed. + */ + public static final class Node implements StoreTreeNode, Identifiable { + private final HashSet> listeners = new HashSet<>(); + private final Map children = new HashMap<>(); + private final PathArgument identifier; + private final Reference parent; + + private Node(final Node parent, final PathArgument identifier) { + this.parent = new WeakReference<>(parent); + this.identifier = identifier; + } + + @Override + public PathArgument getIdentifier() { + return identifier; + } + + @Override + public Optional getChild(final PathArgument child) { + return Optional.fromNullable(children.get(child)); + } + + /** + * Return the list of current listeners. This collection is guaranteed + * to be immutable only while the walker, through which this node is + * reachable remains unclosed. + * + * @return the list of current listeners + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + public Collection> getListeners() { + return (Collection) listeners; + } + + private Node ensureChild(final PathArgument child) { + Node potential = (children.get(child)); + if (potential == null) { + potential = new Node(this, child); + children.put(child, potential); + } + return potential; + } + + private void addListener(final DataChangeListenerRegistration listener) { + listeners.add(listener); + LOG.debug("Listener {} registered", listener); + } + + private void removeListener(final DataChangeListenerRegistration listener) { + listeners.remove(listener); + LOG.debug("Listener {} unregistered", listener); + + // We have been called with the write-lock held, so we can perform some cleanup. + removeThisIfUnused(); + } + + private void removeThisIfUnused() { + final Node p = parent.get(); + if (p != null && listeners.isEmpty() && children.isEmpty()) { + p.removeChild(identifier); + } + } + + private void removeChild(final PathArgument arg) { + children.remove(arg); + removeThisIfUnused(); + } + } + + private abstract static class DataChangeListenerRegistration>> extends AbstractListenerRegistration implements + org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration { + + public DataChangeListenerRegistration(final T listener) { + super(listener); + } + } + +} -- 2.36.6