From d43ffaf3191736e19f2c9f837e8d96aa6c9cfaa1 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Mon, 16 Mar 2015 18:54:39 +0100 Subject: [PATCH] BUG-2673: make IMDS implement DOMStoreTreeChangePublisher This patch makes use of the AbstractDOMStoreTreeChangePublisher to do all the registration wrangling and forwards all notifications towards a dedicated publisher. Notifications are offloaded to the background executor, just as normal DataChangeNotifications are. This initial implementation does not close the registration listener race also present in the data change notifications, as the notification manager lacks the APIs to do that. Change-Id: I36220b52a7a67df1f340b2b8bf32918de025920b Signed-off-by: Robert Varga --- .../dom/store/impl/InMemoryDOMDataStore.java | 12 +++- .../InMemoryDOMStoreTreeChangePublisher.java | 65 +++++++++++++++++++ .../store/impl/SimpleDataTreeCandidate.java | 39 +++++++++++ 3 files changed, 115 insertions(+), 1 deletion(-) create mode 100644 opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMStoreTreeChangePublisher.java create mode 100644 opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SimpleDataTreeCandidate.java diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java index deddd6938a..1f85b473fe 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java @@ -19,6 +19,7 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataCh import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype; import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree; import org.opendaylight.controller.sal.core.spi.data.DOMStore; @@ -26,6 +27,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; import org.opendaylight.yangtools.concepts.Identifiable; @@ -56,7 +58,7 @@ import org.slf4j.LoggerFactory; * to implement {@link DOMStore} contract. * */ -public class InMemoryDOMDataStore extends TransactionReadyPrototype implements DOMStore, Identifiable, SchemaContextListener, AutoCloseable { +public class InMemoryDOMDataStore extends TransactionReadyPrototype implements DOMStore, Identifiable, SchemaContextListener, AutoCloseable, DOMStoreTreeChangePublisher { private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class); private static final ListenableFuture SUCCESSFUL_FUTURE = Futures.immediateFuture(null); private static final ListenableFuture CAN_COMMIT_FUTURE = Futures.immediateFuture(Boolean.TRUE); @@ -78,6 +80,7 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D private final AtomicLong txCounter = new AtomicLong(0); private final QueuedNotificationManager, DOMImmutableDataChangeEvent> dataChangeListenerNotificationManager; + private final InMemoryDOMStoreTreeChangePublisher changePublisher; private final ExecutorService dataChangeListenerExecutor; private final boolean debugTransactions; private final String name; @@ -98,6 +101,7 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D new QueuedNotificationManager<>(this.dataChangeListenerExecutor, DCL_NOTIFICATION_MGR_INVOKER, maxDataChangeListenerQueueSize, "DataChangeListenerQueueMgr"); + changePublisher = new InMemoryDOMStoreTreeChangePublisher(this.dataChangeListenerExecutor, maxDataChangeListenerQueueSize); } public void setCloseable(final AutoCloseable closeable) { @@ -199,6 +203,11 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D }; } + @Override + public ListenerRegistration registerTreeChangeListener(final YangInstanceIdentifier treeId, final L listener) { + return changePublisher.registerTreeChangeListener(treeId, listener); + } + @Override protected void transactionAborted(final SnapshotBackedWriteTransaction tx) { LOG.debug("Tx: {} is closed.", tx.getIdentifier()); @@ -281,6 +290,7 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D */ synchronized (InMemoryDOMDataStore.this) { dataTree.commit(candidate); + changePublisher.publishChange(candidate); listenerResolver.resolve(dataChangeListenerNotificationManager); } diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMStoreTreeChangePublisher.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMStoreTreeChangePublisher.java new file mode 100644 index 0000000000..999fb91c65 --- /dev/null +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMStoreTreeChangePublisher.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.dom.store.impl; + +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import javax.annotation.Nonnull; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration; +import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTreeChangePublisher; +import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager; +import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.Invoker; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class InMemoryDOMStoreTreeChangePublisher extends AbstractDOMStoreTreeChangePublisher { + private static final Invoker, DataTreeCandidate> MANAGER_INVOKER = + new Invoker, DataTreeCandidate>() { + @Override + public void invokeListener(final AbstractDOMDataTreeChangeListenerRegistration listener, final DataTreeCandidate notification) { + // FIXME: this is inefficient, as we could grab the entire queue for the listener and post it + final DOMDataTreeChangeListener inst = listener.getInstance(); + if (inst != null) { + inst.onDataTreeChanged(Collections.singletonList(notification)); + } + } + }; + private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMStoreTreeChangePublisher.class); + private final QueuedNotificationManager, DataTreeCandidate> notificationManager; + + InMemoryDOMStoreTreeChangePublisher(final ExecutorService listenerExecutor, final int maxQueueSize) { + notificationManager = new QueuedNotificationManager<>(listenerExecutor, MANAGER_INVOKER, maxQueueSize, "DataTreeChangeListenerQueueMgr"); + } + + @Override + protected void notifyListeners(final Collection> registrations, final YangInstanceIdentifier path, final DataTreeCandidateNode node) { + final DataTreeCandidate candidate = new SimpleDataTreeCandidate(path, node); + + for (AbstractDOMDataTreeChangeListenerRegistration reg : registrations) { + LOG.debug("Enqueueing candidate {} to registration {}", candidate, registrations); + notificationManager.submitNotification(reg, candidate); + } + } + + @Override + protected synchronized void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration registration) { + LOG.debug("Closing registration {}", registration); + + // FIXME: remove the queue for this registration and make sure we clear it + } + + synchronized void publishChange(@Nonnull final DataTreeCandidate candidate) { + // Runs synchronized with registrationRemoved() + processCandidateTree(candidate); + } +} diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SimpleDataTreeCandidate.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SimpleDataTreeCandidate.java new file mode 100644 index 0000000000..701841ca46 --- /dev/null +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SimpleDataTreeCandidate.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.dom.store.impl; + +import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; + +final class SimpleDataTreeCandidate implements DataTreeCandidate { + private final YangInstanceIdentifier rootPath; + private final DataTreeCandidateNode rootNode; + + SimpleDataTreeCandidate(final YangInstanceIdentifier rootPath, final DataTreeCandidateNode rootNode) { + this.rootPath = Preconditions.checkNotNull(rootPath); + this.rootNode = Preconditions.checkNotNull(rootNode); + } + + @Override + public DataTreeCandidateNode getRootNode() { + return rootNode; + } + + @Override + public YangInstanceIdentifier getRootPath() { + return rootPath; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("rootPath", rootPath).add("rootNode", rootNode).toString(); + } +} \ No newline at end of file -- 2.36.6