BUG-2673: make IMDS implement DOMStoreTreeChangePublisher 62/16662/2
authorRobert Varga <rovarga@cisco.com>
Mon, 16 Mar 2015 17:54:39 +0000 (18:54 +0100)
committerRobert Varga <rovarga@cisco.com>
Tue, 17 Mar 2015 08:52:34 +0000 (09:52 +0100)
This patch makes use of the AbstractDOMStoreTreeChangePublisher to do all
the registration wrangling and forwards all notifications towards a
dedicated publisher. Notifications are offloaded to the background
executor, just as normal DataChangeNotifications are. This initial
implementation does not close the registration listener race also
present in the data change notifications, as the notification manager
lacks the APIs to do that.

Change-Id: I36220b52a7a67df1f340b2b8bf32918de025920b
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMStoreTreeChangePublisher.java [new file with mode: 0644]
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SimpleDataTreeCandidate.java [new file with mode: 0644]

index deddd6938ae477eeebbd9fdeb50c79b6d3694eab..1f85b473feb9478f849e33fc278c7907f5530e9e 100644 (file)
@@ -19,6 +19,7 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataCh
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
@@ -26,6 +27,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
 import org.opendaylight.yangtools.concepts.Identifiable;
@@ -56,7 +58,7 @@ import org.slf4j.LoggerFactory;
  * to implement {@link DOMStore} contract.
  *
  */
-public class InMemoryDOMDataStore extends TransactionReadyPrototype implements DOMStore, Identifiable<String>, SchemaContextListener, AutoCloseable {
+public class InMemoryDOMDataStore extends TransactionReadyPrototype implements DOMStore, Identifiable<String>, SchemaContextListener, AutoCloseable, DOMStoreTreeChangePublisher {
     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
     private static final ListenableFuture<Void> SUCCESSFUL_FUTURE = Futures.immediateFuture(null);
     private static final ListenableFuture<Boolean> CAN_COMMIT_FUTURE = Futures.immediateFuture(Boolean.TRUE);
@@ -78,6 +80,7 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D
     private final AtomicLong txCounter = new AtomicLong(0);
 
     private final QueuedNotificationManager<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> dataChangeListenerNotificationManager;
+    private final InMemoryDOMStoreTreeChangePublisher changePublisher;
     private final ExecutorService dataChangeListenerExecutor;
     private final boolean debugTransactions;
     private final String name;
@@ -98,6 +101,7 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D
                 new QueuedNotificationManager<>(this.dataChangeListenerExecutor,
                         DCL_NOTIFICATION_MGR_INVOKER, maxDataChangeListenerQueueSize,
                         "DataChangeListenerQueueMgr");
+        changePublisher = new InMemoryDOMStoreTreeChangePublisher(this.dataChangeListenerExecutor, maxDataChangeListenerQueueSize);
     }
 
     public void setCloseable(final AutoCloseable closeable) {
@@ -199,6 +203,11 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D
         };
     }
 
+    @Override
+    public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(final YangInstanceIdentifier treeId, final L listener) {
+        return changePublisher.registerTreeChangeListener(treeId, listener);
+    }
+
     @Override
     protected void transactionAborted(final SnapshotBackedWriteTransaction tx) {
         LOG.debug("Tx: {} is closed.", tx.getIdentifier());
@@ -281,6 +290,7 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D
              */
             synchronized (InMemoryDOMDataStore.this) {
                 dataTree.commit(candidate);
+                changePublisher.publishChange(candidate);
                 listenerResolver.resolve(dataChangeListenerNotificationManager);
             }
 
diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMStoreTreeChangePublisher.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMStoreTreeChangePublisher.java
new file mode 100644 (file)
index 0000000..999fb91
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
+import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTreeChangePublisher;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.Invoker;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class InMemoryDOMStoreTreeChangePublisher extends AbstractDOMStoreTreeChangePublisher {
+    private static final Invoker<AbstractDOMDataTreeChangeListenerRegistration<?>, DataTreeCandidate> MANAGER_INVOKER =
+            new Invoker<AbstractDOMDataTreeChangeListenerRegistration<?>, DataTreeCandidate>() {
+                @Override
+                public void invokeListener(final AbstractDOMDataTreeChangeListenerRegistration<?> listener, final DataTreeCandidate notification) {
+                    // FIXME: this is inefficient, as we could grab the entire queue for the listener and post it
+                    final DOMDataTreeChangeListener inst = listener.getInstance();
+                    if (inst != null) {
+                        inst.onDataTreeChanged(Collections.singletonList(notification));
+                    }
+                }
+            };
+    private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMStoreTreeChangePublisher.class);
+    private final QueuedNotificationManager<AbstractDOMDataTreeChangeListenerRegistration<?>, DataTreeCandidate> notificationManager;
+
+    InMemoryDOMStoreTreeChangePublisher(final ExecutorService listenerExecutor, final int maxQueueSize) {
+        notificationManager = new QueuedNotificationManager<>(listenerExecutor, MANAGER_INVOKER, maxQueueSize, "DataTreeChangeListenerQueueMgr");
+    }
+
+    @Override
+    protected void notifyListeners(final Collection<AbstractDOMDataTreeChangeListenerRegistration<?>> registrations, final YangInstanceIdentifier path, final DataTreeCandidateNode node) {
+        final DataTreeCandidate candidate = new SimpleDataTreeCandidate(path, node);
+
+        for (AbstractDOMDataTreeChangeListenerRegistration<?> reg : registrations) {
+            LOG.debug("Enqueueing candidate {} to registration {}", candidate, registrations);
+            notificationManager.submitNotification(reg, candidate);
+        }
+    }
+
+    @Override
+    protected synchronized void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration<?> registration) {
+        LOG.debug("Closing registration {}", registration);
+
+        // FIXME: remove the queue for this registration and make sure we clear it
+    }
+
+    synchronized void publishChange(@Nonnull final DataTreeCandidate candidate) {
+        // Runs synchronized with registrationRemoved()
+        processCandidateTree(candidate);
+    }
+}
diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SimpleDataTreeCandidate.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SimpleDataTreeCandidate.java
new file mode 100644 (file)
index 0000000..701841c
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+
+final class SimpleDataTreeCandidate implements DataTreeCandidate {
+    private final YangInstanceIdentifier rootPath;
+    private final DataTreeCandidateNode rootNode;
+
+    SimpleDataTreeCandidate(final YangInstanceIdentifier rootPath, final DataTreeCandidateNode rootNode) {
+        this.rootPath = Preconditions.checkNotNull(rootPath);
+        this.rootNode = Preconditions.checkNotNull(rootNode);
+    }
+
+    @Override
+    public DataTreeCandidateNode getRootNode() {
+        return rootNode;
+    }
+
+    @Override
+    public YangInstanceIdentifier getRootPath() {
+        return rootPath;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this).add("rootPath", rootPath).add("rootNode", rootNode).toString();
+    }
+}
\ No newline at end of file