import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
import org.opendaylight.yangtools.concepts.Identifiable;
* to implement {@link DOMStore} contract.
*
*/
-public class InMemoryDOMDataStore extends TransactionReadyPrototype implements DOMStore, Identifiable<String>, SchemaContextListener, AutoCloseable {
+public class InMemoryDOMDataStore extends TransactionReadyPrototype implements DOMStore, Identifiable<String>, SchemaContextListener, AutoCloseable, DOMStoreTreeChangePublisher {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
private static final ListenableFuture<Void> SUCCESSFUL_FUTURE = Futures.immediateFuture(null);
private static final ListenableFuture<Boolean> CAN_COMMIT_FUTURE = Futures.immediateFuture(Boolean.TRUE);
private final AtomicLong txCounter = new AtomicLong(0);
private final QueuedNotificationManager<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> dataChangeListenerNotificationManager;
+ private final InMemoryDOMStoreTreeChangePublisher changePublisher;
private final ExecutorService dataChangeListenerExecutor;
private final boolean debugTransactions;
private final String name;
new QueuedNotificationManager<>(this.dataChangeListenerExecutor,
DCL_NOTIFICATION_MGR_INVOKER, maxDataChangeListenerQueueSize,
"DataChangeListenerQueueMgr");
+ changePublisher = new InMemoryDOMStoreTreeChangePublisher(this.dataChangeListenerExecutor, maxDataChangeListenerQueueSize);
}
public void setCloseable(final AutoCloseable closeable) {
};
}
+ @Override
+ public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(final YangInstanceIdentifier treeId, final L listener) {
+ return changePublisher.registerTreeChangeListener(treeId, listener);
+ }
+
@Override
protected void transactionAborted(final SnapshotBackedWriteTransaction tx) {
LOG.debug("Tx: {} is closed.", tx.getIdentifier());
*/
synchronized (InMemoryDOMDataStore.this) {
dataTree.commit(candidate);
+ changePublisher.publishChange(candidate);
listenerResolver.resolve(dataChangeListenerNotificationManager);
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
+import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTreeChangePublisher;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.Invoker;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class InMemoryDOMStoreTreeChangePublisher extends AbstractDOMStoreTreeChangePublisher {
+ private static final Invoker<AbstractDOMDataTreeChangeListenerRegistration<?>, DataTreeCandidate> MANAGER_INVOKER =
+ new Invoker<AbstractDOMDataTreeChangeListenerRegistration<?>, DataTreeCandidate>() {
+ @Override
+ public void invokeListener(final AbstractDOMDataTreeChangeListenerRegistration<?> listener, final DataTreeCandidate notification) {
+ // FIXME: this is inefficient, as we could grab the entire queue for the listener and post it
+ final DOMDataTreeChangeListener inst = listener.getInstance();
+ if (inst != null) {
+ inst.onDataTreeChanged(Collections.singletonList(notification));
+ }
+ }
+ };
+ private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMStoreTreeChangePublisher.class);
+ private final QueuedNotificationManager<AbstractDOMDataTreeChangeListenerRegistration<?>, DataTreeCandidate> notificationManager;
+
+ InMemoryDOMStoreTreeChangePublisher(final ExecutorService listenerExecutor, final int maxQueueSize) {
+ notificationManager = new QueuedNotificationManager<>(listenerExecutor, MANAGER_INVOKER, maxQueueSize, "DataTreeChangeListenerQueueMgr");
+ }
+
+ @Override
+ protected void notifyListeners(final Collection<AbstractDOMDataTreeChangeListenerRegistration<?>> registrations, final YangInstanceIdentifier path, final DataTreeCandidateNode node) {
+ final DataTreeCandidate candidate = new SimpleDataTreeCandidate(path, node);
+
+ for (AbstractDOMDataTreeChangeListenerRegistration<?> reg : registrations) {
+ LOG.debug("Enqueueing candidate {} to registration {}", candidate, registrations);
+ notificationManager.submitNotification(reg, candidate);
+ }
+ }
+
+ @Override
+ protected synchronized void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration<?> registration) {
+ LOG.debug("Closing registration {}", registration);
+
+ // FIXME: remove the queue for this registration and make sure we clear it
+ }
+
+ synchronized void publishChange(@Nonnull final DataTreeCandidate candidate) {
+ // Runs synchronized with registrationRemoved()
+ processCandidateTree(candidate);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+
+final class SimpleDataTreeCandidate implements DataTreeCandidate {
+ private final YangInstanceIdentifier rootPath;
+ private final DataTreeCandidateNode rootNode;
+
+ SimpleDataTreeCandidate(final YangInstanceIdentifier rootPath, final DataTreeCandidateNode rootNode) {
+ this.rootPath = Preconditions.checkNotNull(rootPath);
+ this.rootNode = Preconditions.checkNotNull(rootNode);
+ }
+
+ @Override
+ public DataTreeCandidateNode getRootNode() {
+ return rootNode;
+ }
+
+ @Override
+ public YangInstanceIdentifier getRootPath() {
+ return rootPath;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("rootPath", rootPath).add("rootNode", rootNode).toString();
+ }
+}
\ No newline at end of file