import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder;
-import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree.Walker;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
private static final DOMImmutableDataChangeEvent NO_CHANGE = builder().build();
private final ImmutableList.Builder<ChangeListenerNotifyTask> tasks = ImmutableList.builder();
private InstanceIdentifier rootPath;
- private ListenerRegistrationNode listenerRoot;
+ private ListenerTree listenerRoot;
private NodeModification modificationRoot;
private Optional<StoreMetadataNode> beforeRoot;
private Optional<StoreMetadataNode> afterRoot;
return this;
}
- protected ListenerRegistrationNode getListenerRoot() {
+ protected ListenerTree getListenerRoot() {
return listenerRoot;
}
- protected DataChangeEventResolver setListenerRoot(final ListenerRegistrationNode listenerRoot) {
+ protected DataChangeEventResolver setListenerRoot(final ListenerTree listenerRoot) {
this.listenerRoot = listenerRoot;
return this;
}
}
public Iterable<ChangeListenerNotifyTask> resolve() {
- LOG.trace("Resolving events for {}" ,modificationRoot);
- resolveAnyChangeEvent(rootPath, Optional.of(listenerRoot), modificationRoot, beforeRoot, afterRoot);
- return tasks.build();
+ LOG.trace("Resolving events for {}", modificationRoot);
+
+ try (final Walker w = listenerRoot.getWalker()) {
+ resolveAnyChangeEvent(rootPath, Optional.of(w.getRootNode()), modificationRoot, beforeRoot, afterRoot);
+ return tasks.build();
+ }
}
private DOMImmutableDataChangeEvent resolveAnyChangeEvent(final InstanceIdentifier path,
- final Optional<ListenerRegistrationNode> listeners, final NodeModification modification,
+ final Optional<ListenerTree.Node> listeners, final NodeModification modification,
final Optional<StoreMetadataNode> before, final Optional<StoreMetadataNode> after) {
// No listeners are present in listener registration subtree
// no before and after state is present
* @return
*/
private DOMImmutableDataChangeEvent resolveCreateEvent(final InstanceIdentifier path,
- final Optional<ListenerRegistrationNode> listeners, final StoreMetadataNode afterState) {
+ final Optional<ListenerTree.Node> listeners, final StoreMetadataNode afterState) {
final NormalizedNode<?, ?> node = afterState.getData();
Builder builder = builder().setAfter(node).addCreated(path, node);
for (StoreMetadataNode child : afterState.getChildren()) {
PathArgument childId = child.getIdentifier();
- Optional<ListenerRegistrationNode> childListeners = getChild(listeners, childId);
+ Optional<ListenerTree.Node> childListeners = getChild(listeners, childId);
InstanceIdentifier childPath = StoreUtils.append(path, childId);
builder.merge(resolveCreateEvent(childPath, childListeners, child));
}
private DOMImmutableDataChangeEvent resolveDeleteEvent(final InstanceIdentifier path,
- final Optional<ListenerRegistrationNode> listeners, final StoreMetadataNode beforeState) {
+ final Optional<ListenerTree.Node> listeners, final StoreMetadataNode beforeState) {
final NormalizedNode<?, ?> node = beforeState.getData();
Builder builder = builder().setBefore(node).addRemoved(path, node);
for (StoreMetadataNode child : beforeState.getChildren()) {
PathArgument childId = child.getIdentifier();
- Optional<ListenerRegistrationNode> childListeners = getChild(listeners, childId);
+ Optional<ListenerTree.Node> childListeners = getChild(listeners, childId);
InstanceIdentifier childPath = StoreUtils.append(path, childId);
builder.merge(resolveDeleteEvent(childPath, childListeners, child));
}
}
private DOMImmutableDataChangeEvent resolveSubtreeChangeEvent(final InstanceIdentifier path,
- final Optional<ListenerRegistrationNode> listeners, final NodeModification modification,
+ final Optional<ListenerTree.Node> listeners, final NodeModification modification,
final StoreMetadataNode before, final StoreMetadataNode after) {
Builder one = builder().setBefore(before.getData()).setAfter(after.getData());
for (NodeModification childMod : modification.getModifications()) {
PathArgument childId = childMod.getIdentifier();
InstanceIdentifier childPath = append(path, childId);
- Optional<ListenerRegistrationNode> childListen = getChild(listeners, childId);
+ Optional<ListenerTree.Node> childListen = getChild(listeners, childId);
Optional<StoreMetadataNode> childBefore = before.getChild(childId);
Optional<StoreMetadataNode> childAfter = after.getChild(childId);
}
private DOMImmutableDataChangeEvent resolveReplacedEvent(final InstanceIdentifier path,
- final Optional<ListenerRegistrationNode> listeners, final NodeModification modification,
+ final Optional<ListenerTree.Node> listeners, final NodeModification modification,
final StoreMetadataNode before, final StoreMetadataNode after) {
// FIXME Add task
return builder().build();
}
- private DOMImmutableDataChangeEvent addNotifyTask(final Optional<ListenerRegistrationNode> listeners, final DOMImmutableDataChangeEvent event) {
+ private DOMImmutableDataChangeEvent addNotifyTask(final Optional<ListenerTree.Node> listeners, final DOMImmutableDataChangeEvent event) {
if (listeners.isPresent()) {
final Collection<DataChangeListenerRegistration<?>> l = listeners.get().getListeners();
if (!l.isEmpty()) {
return event;
}
- private void addNotifyTask(final ListenerRegistrationNode listenerRegistrationNode, final DataChangeScope scope,
+ private void addNotifyTask(final ListenerTree.Node listenerRegistrationNode, final DataChangeScope scope,
final DOMImmutableDataChangeEvent event) {
Collection<DataChangeListenerRegistration<?>> potential = listenerRegistrationNode.getListeners();
if(!potential.isEmpty()) {
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
-import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode.DataChangeListenerRegistration;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
private final ListeningExecutorService executor;
private final String name;
private final AtomicLong txCounter = new AtomicLong(0);
- private final ListenerRegistrationNode listenerTree;
+ private final ListenerTree listenerTree;
private final AtomicReference<DataAndMetadataSnapshot> snapshot;
private ModificationApplyOperation operationTree;
public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
this.name = Preconditions.checkNotNull(name);
this.executor = Preconditions.checkNotNull(executor);
- this.listenerTree = ListenerRegistrationNode.createRoot();
+ this.listenerTree = ListenerTree.create();
this.snapshot = new AtomicReference<DataAndMetadataSnapshot>(DataAndMetadataSnapshot.createEmpty());
this.operationTree = new AlwaysFailOperation();
}
final DataChangeListenerRegistration<L> reg;
synchronized (this) {
LOG.debug("{}: Registering data change listener {} for {}",name,listener,path);
- ListenerRegistrationNode listenerNode = listenerTree;
- for(PathArgument arg : path.getPath()) {
- listenerNode = listenerNode.ensureChild(arg);
- }
- reg = listenerNode.registerDataChangeListener(path, listener, scope);
+ reg = listenerTree.registerDataChangeListener(path, listener, scope);
Optional<StoreMetadataNode> currentState = snapshot.get().read(path);
if (currentState.isPresent()) {
+++ /dev/null
-package org.opendaylight.controller.md.sal.dom.store.impl.tree;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
-import org.opendaylight.yangtools.concepts.Identifiable;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Optional;
-
-public class ListenerRegistrationNode implements StoreTreeNode<ListenerRegistrationNode>, Identifiable<PathArgument> {
-
- private static final Logger LOG = LoggerFactory.getLogger(ListenerRegistrationNode.class);
-
- private final ListenerRegistrationNode parent;
- private final Map<PathArgument, ListenerRegistrationNode> children;
- private final PathArgument identifier;
- private final HashSet<DataChangeListenerRegistration<?>> listeners;
-
- private ListenerRegistrationNode(final PathArgument identifier) {
- this(null, identifier);
- }
-
- private ListenerRegistrationNode(final ListenerRegistrationNode parent, final PathArgument identifier) {
- this.parent = parent;
- this.identifier = identifier;
- children = new HashMap<>();
- listeners = new HashSet<>();
- }
-
- public final static ListenerRegistrationNode createRoot() {
- return new ListenerRegistrationNode(null);
- }
-
- @Override
- public PathArgument getIdentifier() {
- return identifier;
- }
-
- /**
- * Return the list of current listeners. Any caller wishing to use this method
- * has to make sure the collection remains unchanged while it's executing. This
- * means the caller has to synchronize externally both the registration and
- * unregistration process.
- *
- * @return the list of current listeners
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public Collection<org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration<?>> getListeners() {
- return (Collection) listeners;
- }
-
- @Override
- public synchronized Optional<ListenerRegistrationNode> getChild(final PathArgument child) {
- return Optional.fromNullable(children.get(child));
- }
-
- public synchronized ListenerRegistrationNode ensureChild(final PathArgument child) {
- ListenerRegistrationNode potential = (children.get(child));
- if (potential == null) {
- potential = new ListenerRegistrationNode(this, child);
- children.put(child, potential);
- }
- return potential;
- }
-
- /**
- * Registers listener on this node.
- *
- * @param path Full path on which listener is registered.
- * @param listener Listener
- * @param scope Scope of triggering event.
- * @return
- */
- public synchronized <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> DataChangeListenerRegistration<L> registerDataChangeListener(final InstanceIdentifier path,
- final L listener, final DataChangeScope scope) {
-
- DataChangeListenerRegistration<L> listenerReg = new DataChangeListenerRegistration<L>(path,listener, scope, this);
- listeners.add(listenerReg);
- LOG.debug("Listener {} registered", listener);
- return listenerReg;
- }
-
- private synchronized void removeListener(final DataChangeListenerRegistration<?> listener) {
- listeners.remove(listener);
- LOG.debug("Listener {} unregistered", listener);
- removeThisIfUnused();
- }
-
- private void removeThisIfUnused() {
- if (parent != null && listeners.isEmpty() && children.isEmpty()) {
- parent.removeChildIfUnused(this);
- }
- }
-
- public boolean isUnused() {
- return (listeners.isEmpty() && children.isEmpty()) || areChildrenUnused();
- }
-
- private boolean areChildrenUnused() {
- for (ListenerRegistrationNode child : children.values()) {
- if (!child.isUnused()) {
- return false;
- }
- }
- return true;
- }
-
- private void removeChildIfUnused(final ListenerRegistrationNode listenerRegistrationNode) {
- // FIXME Remove unnecessary
- }
-
- public static class DataChangeListenerRegistration<T extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>>
- extends AbstractObjectRegistration<T> implements
- org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration<T> {
-
- private final DataChangeScope scope;
- private ListenerRegistrationNode node;
- private final InstanceIdentifier path;
-
- public DataChangeListenerRegistration(final InstanceIdentifier path,final T listener, final DataChangeScope scope,
- final ListenerRegistrationNode node) {
- super(listener);
- this.path = path;
- this.scope = scope;
- this.node = node;
- }
-
- @Override
- public DataChangeScope getScope() {
- return scope;
- }
-
- @Override
- protected void removeRegistration() {
- node.removeListener(this);
- node = null;
- }
-
- @Override
- public InstanceIdentifier getPath() {
- return path;
- }
- }
-}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.impl.tree;
+
+import java.lang.ref.Reference;
+import java.lang.ref.WeakReference;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+public final class ListenerTree {
+ private static final Logger LOG = LoggerFactory.getLogger(ListenerTree.class);
+ private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
+ private final Node rootNode = new Node(null, null);
+
+ private ListenerTree() {
+
+ }
+
+ public static ListenerTree create() {
+ return new ListenerTree();
+ }
+
+ /**
+ * Registers listener on this node.
+ *
+ * @param path Full path on which listener is registered.
+ * @param listener Listener
+ * @param scope Scope of triggering event.
+ * @return Listener registration
+ */
+ public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> DataChangeListenerRegistration<L> registerDataChangeListener(final InstanceIdentifier path,
+ final L listener, final DataChangeScope scope) {
+
+ // Take the write lock
+ rwLock.writeLock().lock();
+
+ try {
+ Node walkNode = rootNode;
+ for(PathArgument arg : path.getPath()) {
+ walkNode = walkNode.ensureChild(arg);
+ }
+
+ final Node node = walkNode;
+ DataChangeListenerRegistration<L> listenerReg = new DataChangeListenerRegistration<L>(listener) {
+ @Override
+ public DataChangeScope getScope() {
+ return scope;
+ }
+
+ @Override
+ public InstanceIdentifier getPath() {
+ return path;
+ }
+
+ @Override
+ protected void removeRegistration() {
+ /*
+ * TODO: Here's an interesting problem. The way the datastore works, it
+ * enqueues requests towards the listener, so the listener will be
+ * notified at some point in the future. Now if the registration is
+ * closed, we will prevent any new events from being delivered, but
+ * we have no way to purge that queue.
+ *
+ * While this does not directly violate the ListenerRegistration
+ * contract, it is probably not going to be liked by the users.
+ */
+
+ // Take the write lock
+ ListenerTree.this.rwLock.writeLock().lock();
+ try {
+ node.removeListener(this);
+ } finally {
+ // Always release the lock
+ ListenerTree.this.rwLock.writeLock().unlock();
+ }
+ }
+ };
+
+ node.addListener(listenerReg);
+ return listenerReg;
+ } finally {
+ // Always release the lock
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ public Walker getWalker() {
+ /*
+ * TODO: The only current user of this method is local to the datastore.
+ * Since this class represents a read-lock, losing a reference to
+ * it is a _major_ problem, as the registration process will get
+ * wedged, eventually grinding the system to a halt. Should an
+ * external user exist, make the Walker a phantom reference, which
+ * will cleanup the lock if not told to do so.
+ */
+ final Walker ret = new Walker(rwLock.readLock(), rootNode);
+ rwLock.readLock().lock();
+ return ret;
+ }
+
+ public static final class Walker implements AutoCloseable {
+ private final Lock lock;
+ private final Node node;
+
+ @GuardedBy("this")
+ private boolean valid = true;
+
+ private Walker(final Lock lock, final Node node) {
+ this.lock = Preconditions.checkNotNull(lock);
+ this.node = Preconditions.checkNotNull(node);
+ }
+
+ public Node getRootNode() {
+ return node;
+ }
+
+ @Override
+ public synchronized void close() {
+ if (valid) {
+ lock.unlock();
+ valid = false;
+ }
+ }
+ }
+
+ /**
+ * This is a single node within the listener tree. Note that the data returned from
+ * and instance of this class is guaranteed to have any relevance or consistency
+ * only as long as the {@link Walker} instance through which it is reached remains
+ * unclosed.
+ */
+ public static final class Node implements StoreTreeNode<Node>, Identifiable<PathArgument> {
+ private final HashSet<DataChangeListenerRegistration<?>> listeners = new HashSet<>();
+ private final Map<PathArgument, Node> children = new HashMap<>();
+ private final PathArgument identifier;
+ private final Reference<Node> parent;
+
+ private Node(final Node parent, final PathArgument identifier) {
+ this.parent = new WeakReference<>(parent);
+ this.identifier = identifier;
+ }
+
+ @Override
+ public PathArgument getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public Optional<Node> getChild(final PathArgument child) {
+ return Optional.fromNullable(children.get(child));
+ }
+
+ /**
+ * Return the list of current listeners. This collection is guaranteed
+ * to be immutable only while the walker, through which this node is
+ * reachable remains unclosed.
+ *
+ * @return the list of current listeners
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public Collection<org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration<?>> getListeners() {
+ return (Collection) listeners;
+ }
+
+ private Node ensureChild(final PathArgument child) {
+ Node potential = (children.get(child));
+ if (potential == null) {
+ potential = new Node(this, child);
+ children.put(child, potential);
+ }
+ return potential;
+ }
+
+ private void addListener(final DataChangeListenerRegistration<?> listener) {
+ listeners.add(listener);
+ LOG.debug("Listener {} registered", listener);
+ }
+
+ private void removeListener(final DataChangeListenerRegistration<?> listener) {
+ listeners.remove(listener);
+ LOG.debug("Listener {} unregistered", listener);
+
+ // We have been called with the write-lock held, so we can perform some cleanup.
+ removeThisIfUnused();
+ }
+
+ private void removeThisIfUnused() {
+ final Node p = parent.get();
+ if (p != null && listeners.isEmpty() && children.isEmpty()) {
+ p.removeChild(identifier);
+ }
+ }
+
+ private void removeChild(final PathArgument arg) {
+ children.remove(arg);
+ removeThisIfUnused();
+ }
+ }
+
+ private abstract static class DataChangeListenerRegistration<T extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> extends AbstractListenerRegistration<T> implements
+ org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration<T> {
+
+ public DataChangeListenerRegistration(final T listener) {
+ super(listener);
+ }
+ }
+
+}