import java.util.Objects;
import java.util.Set;
import javax.annotation.Nonnull;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataTreeChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
}
@SuppressWarnings("checkstyle:hiddenField")
- public boolean subtreesOverlap(YangInstanceIdentifier iid, LogicalDatastoreType store,
- AsyncDataBroker.DataChangeScope scope) {
+ public boolean subtreesOverlap(YangInstanceIdentifier iid, LogicalDatastoreType store) {
if (this.store != null && !this.store.equals(store)) {
return false;
}
String otherIidString = toIidCompString(iid);
- switch (scope) {
- case BASE:
- return isParent(iidString, otherIidString);
- case ONE: //for now just treat like SUBTREE, even though it's not
- case SUBTREE:
- return isParent(iidString, otherIidString) || isParent(otherIidString, iidString);
- default:
- return false;
- }
+ return isParent(iidString, otherIidString) || isParent(otherIidString, iidString);
}
@SuppressWarnings("checkstyle:hiddenField")
writeWatches.add(watch);
}
- private boolean isRegistrationWatched(YangInstanceIdentifier iid,
- LogicalDatastoreType store, DataChangeScope scope) {
+ private boolean isRegistrationWatched(YangInstanceIdentifier iid, LogicalDatastoreType store) {
if (registrationWatches.isEmpty()) {
return true;
}
for (Watch regInterest : registrationWatches) {
- if (regInterest.subtreesOverlap(iid, store, scope)) {
+ if (regInterest.subtreesOverlap(iid, store)) {
return true;
}
}
return new TracingWriteTransaction(delegate.newWriteOnlyTransaction(), this, writeTransactionsRegistry);
}
- @Override
- public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(
- LogicalDatastoreType store, YangInstanceIdentifier yiid,
- DOMDataChangeListener listener, DataChangeScope scope) {
- if (isRegistrationWatched(yiid, store, scope)) {
- LOG.warn("Registration (registerDataChangeListener) for {} from {}",
- toPathString(yiid), getStackSummary());
- }
- return delegate.registerDataChangeListener(store, yiid, listener, scope);
- }
-
@Override
public DOMTransactionChain createTransactionChain(TransactionChainListener transactionChainListener) {
return new TracingTransactionChain(
public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerDataTreeChangeListener(
@Nonnull DOMDataTreeIdentifier domDataTreeIdentifier, @Nonnull L listener) {
if (isRegistrationWatched(domDataTreeIdentifier.getRootIdentifier(),
- domDataTreeIdentifier.getDatastoreType(), DataChangeScope.SUBTREE)) {
+ domDataTreeIdentifier.getDatastoreType())) {
LOG.warn("{} registration (registerDataTreeChangeListener) for {} from {}.",
listener instanceof ClusteredDOMDataTreeChangeListener ? "Clustered" : "Non-clustered",
toPathString(domDataTreeIdentifier.getRootIdentifier()), getStackSummary());
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-
-package org.opendaylight.controller.md.sal.binding.api;
-
-/**
- * <p>
- * ClusteredDataChangeListener is a marker interface to enable data change notifications on all instances in a cluster,
- * where this listener is registered.
- * </p>
- *
- * <p>Applications should implement ClusteredDataChangeListener instead of DataChangeListener, if they want to listen
- * to data change notifications on any node of clustered datastore. DataChangeListener enables data change notifications
- * only at leader of the datastore shard.</p>
- *
- * @deprecated Replaced by {@link ClusteredDataTreeChangeListener}
- */
-@Deprecated
-public interface ClusteredDataChangeListener extends DataChangeListener {
-}
package org.opendaylight.controller.md.sal.binding.api;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainFactory;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
* @see AsyncDataBroker
* @see TransactionChainFactory
*/
-public interface DataBroker extends AsyncDataBroker<InstanceIdentifier<?>, DataObject, DataChangeListener>,
+public interface DataBroker extends AsyncDataBroker<InstanceIdentifier<?>, DataObject>,
TransactionChainFactory<InstanceIdentifier<?>, DataObject>, TransactionFactory, BindingService,
DataTreeChangeService {
@Override
@Override
WriteTransaction newWriteOnlyTransaction();
- @Override
- ListenerRegistration<DataChangeListener> registerDataChangeListener(LogicalDatastoreType store,
- InstanceIdentifier<?> path, DataChangeListener listener, DataChangeScope triggeringScope);
-
@Override
BindingTransactionChain createTransactionChain(TransactionChainListener listener);
}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.binding.api;
-
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-
-/*
- * DataChangeListener enables data change notifications only at leader of the datastore shard
- *
- * @Deprecated Replaced by {@link DataTreeChangeListener}
- */
-@Deprecated
-public interface DataChangeListener extends AsyncDataChangeListener<InstanceIdentifier<?>, DataObject> {
- @Override
- void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change);
-}
/**
* Interface implemented by classes interested in receiving notifications about
- * data tree changes. This interface differs from {@link DataChangeListener}
- * in that it provides a cursor-based view of the change, which has potentially
+ * data tree changes. This interface provides a cursor-based view of the change, which has potentially
* lower overhead and allow more flexible consumption of change event.
*
* <p>
import com.google.common.collect.ForwardingObject;
import javax.annotation.Nonnull;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
/**
* Utility {@link DataBroker} implementation which forwards all interface method
return delegate().registerDataTreeChangeListener(treeId, listener);
}
- @Override
- public ListenerRegistration<DataChangeListener> registerDataChangeListener(LogicalDatastoreType store,
- InstanceIdentifier<?> path, DataChangeListener listener, DataChangeScope triggeringScope) {
- return delegate().registerDataChangeListener(store, path, listener, triggeringScope);
- }
-
@Override
public BindingTransactionChain createTransactionChain(TransactionChainListener listener) {
return delegate().createTransactionChain(listener);
*/
package org.opendaylight.controller.md.sal.binding.impl;
-import com.google.common.base.MoreObjects;
import com.google.common.base.Optional;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import org.opendaylight.controller.md.sal.binding.api.ClusteredDataChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
import org.opendaylight.mdsal.dom.api.DOMSchemaService;
-import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
import org.opendaylight.yangtools.concepts.Delegator;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
return domDataBroker;
}
- public ListenerRegistration<DataChangeListener> registerDataChangeListener(final LogicalDatastoreType store,
- final InstanceIdentifier<?> path, final DataChangeListener listener,
- final DataChangeScope triggeringScope) {
- final DOMDataChangeListener domDataChangeListener;
-
- if (listener instanceof ClusteredDataChangeListener) {
- domDataChangeListener = new TranslatingClusteredDataChangeInvoker(store, path, listener, triggeringScope);
- } else {
- domDataChangeListener = new TranslatingDataChangeInvoker(path, listener);
- }
- final YangInstanceIdentifier domPath = codec.toYangInstanceIdentifierBlocking(path);
- final ListenerRegistration<DOMDataChangeListener> domRegistration =
- domDataBroker.registerDataChangeListener(store, domPath, domDataChangeListener, triggeringScope);
- return new ListenerRegistrationImpl(listener, domRegistration);
- }
-
protected Map<InstanceIdentifier<?>, DataObject> toBinding(final InstanceIdentifier<?> path,
final Map<YangInstanceIdentifier, ? extends NormalizedNode<?, ?>> normalized) {
final Map<InstanceIdentifier<?>, DataObject> newMap = new HashMap<>();
.apply(Optional.<NormalizedNode<?, ?>>of(data));
}
- private class TranslatingDataChangeInvoker implements DOMDataChangeListener {
- private final DataChangeListener bindingDataChangeListener;
- private final InstanceIdentifier<?> path;
-
- TranslatingDataChangeInvoker(final InstanceIdentifier<?> path,
- final DataChangeListener bindingDataChangeListener) {
- this.path = path;
- this.bindingDataChangeListener = bindingDataChangeListener;
- }
-
- @Override
- public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
- bindingDataChangeListener.onDataChanged(new TranslatedDataChangeEvent(change, path));
- }
-
- @Override
- public String toString() {
- return bindingDataChangeListener.getClass().getName();
- }
- }
-
- /**
- * Translator for ClusteredDataChangeListener.
- */
- private class TranslatingClusteredDataChangeInvoker extends TranslatingDataChangeInvoker implements
- ClusteredDOMDataChangeListener {
-
- TranslatingClusteredDataChangeInvoker(final LogicalDatastoreType store, final InstanceIdentifier<?> path,
- final DataChangeListener bindingDataChangeListener, final DataChangeScope triggeringScope) {
- super(path, bindingDataChangeListener);
- }
- }
-
- private class TranslatedDataChangeEvent implements AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> {
- private final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> domEvent;
- private final InstanceIdentifier<?> path;
-
- private Map<InstanceIdentifier<?>, DataObject> createdCache;
- private Map<InstanceIdentifier<?>, DataObject> updatedCache;
- private Map<InstanceIdentifier<?>, DataObject> originalCache;
- private Set<InstanceIdentifier<?>> removedCache;
- private Optional<DataObject> originalDataCache;
- private Optional<DataObject> updatedDataCache;
-
- TranslatedDataChangeEvent(
- final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change,
- final InstanceIdentifier<?> path) {
- this.domEvent = change;
- this.path = path;
- }
-
- @Override
- public Map<InstanceIdentifier<?>, DataObject> getCreatedData() {
- if (createdCache == null) {
- createdCache = Collections.unmodifiableMap(toBinding(path, domEvent.getCreatedData()));
- }
- return createdCache;
- }
-
- @Override
- public Map<InstanceIdentifier<?>, DataObject> getUpdatedData() {
- if (updatedCache == null) {
- updatedCache = Collections.unmodifiableMap(toBinding(path, domEvent.getUpdatedData()));
- }
- return updatedCache;
-
- }
-
- @Override
- public Set<InstanceIdentifier<?>> getRemovedPaths() {
- if (removedCache == null) {
- removedCache = Collections.unmodifiableSet(toBinding(path, domEvent.getRemovedPaths()));
- }
- return removedCache;
- }
-
- @Override
- public Map<InstanceIdentifier<?>, DataObject> getOriginalData() {
- if (originalCache == null) {
- originalCache = Collections.unmodifiableMap(toBinding(path, domEvent.getOriginalData()));
- }
- return originalCache;
-
- }
-
- @Override
- public DataObject getOriginalSubtree() {
- if (originalDataCache == null) {
- if (domEvent.getOriginalSubtree() != null) {
- originalDataCache = toBindingData(path, domEvent.getOriginalSubtree());
- } else {
- originalDataCache = Optional.absent();
- }
- }
- return originalDataCache.orNull();
- }
-
- @Override
- public DataObject getUpdatedSubtree() {
- if (updatedDataCache == null) {
- if (domEvent.getUpdatedSubtree() != null) {
- updatedDataCache = toBindingData(path, domEvent.getUpdatedSubtree());
- } else {
- updatedDataCache = Optional.absent();
- }
- }
- return updatedDataCache.orNull();
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(TranslatedDataChangeEvent.class) //
- .add("created", getCreatedData()) //
- .add("updated", getUpdatedData()) //
- .add("removed", getRemovedPaths()) //
- .add("dom", domEvent) //
- .toString();
- }
- }
-
- private static class ListenerRegistrationImpl extends AbstractListenerRegistration<DataChangeListener> {
- private final ListenerRegistration<DOMDataChangeListener> registration;
-
- ListenerRegistrationImpl(final DataChangeListener listener,
- final ListenerRegistration<DOMDataChangeListener> registration) {
- super(listener);
- this.registration = registration;
- }
-
- @Override
- protected void removeRegistration() {
- registration.close();
- }
- }
-
@Override
public void close() {
}
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.Top;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelList;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelListKey;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
domMountPointService.createMountPoint(TLL_INSTANCE_ID_BI)
.addService(DOMDataBroker.class, new DOMDataBroker() {
- @Override
- public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(
- final LogicalDatastoreType store, final YangInstanceIdentifier path,
- final DOMDataChangeListener listener, final DataChangeScope triggeringScope) {
- throw new UnsupportedOperationException();
- }
-
@Override
public DOMDataWriteTransaction newWriteOnlyTransaction() {
throw new UnsupportedOperationException();
*/
package org.opendaylight.controller.md.sal.common.api.data;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.concepts.Path;
/**
* @param <D>
* Type of data (payload), which represents data payload
*/
-public interface AsyncDataBroker<P extends Path<P>, D, L extends AsyncDataChangeListener<P, D>> extends //
- AsyncDataTransactionFactory<P, D> {
-
- /**
- * Scope of Data Change
- *
- * <p>
- * Represents scope of data change (addition, replacement, deletion).
- * The terminology for scope types is reused from LDAP.
- *
- * <h2>Examples</h2>
- *
- * <p>
- * Following is an example model with comments describing what notifications
- * you would receive based on the scope you specify, when you are
- * registering for changes on container a.
- *
- * <pre>
- * container a // scope BASE, ONE, SUBTREE
- * leaf "foo" // scope ONE, SUBTREE
- * container // scope ONE, SUBTREE
- * leaf "bar" // scope SUBTREE
- * list list // scope ONE, SUBTREE
- * list [a] // scope SUBTREE
- * id "a" // scope SUBTREE
- * list [b] // scope SUBTREE
- * id "b" // scope SUBTREE
- * </pre>
- *
- * <p>
- * Following is an example model with comments describing what notifications
- * you would receive based on the scope you specify, when you are
- * registering for changes on list list (without specifying concrete item in
- * the list).
- *
- * <pre>
- * list list // scope BASE, ONE, SUBTREE
- * list [a] // scope ONE, SUBTREE
- * id "a" // scope SUBTREE
- * list [b] // scope ONE, SUBTREE
- * id "b" // scope SUBTREE
- * </pre>
- *
- *
- * @see <a href="http://www.idevelopment.info/data/LDAP/LDAP_Resources/SEARCH_Setting_the_SCOPE_Parameter.shtml">LDAP</a>
- */
- enum DataChangeScope {
-
- /**
- * Represents only a direct change of the node, such as replacement of a node, addition or
- * deletion. Note that, as described in {@link #ONE}, this may have counterintuitive
- * interactions when viewed from a <i>binding aware</i> application, in particular when it
- * pertains to lists.
- *
- */
- BASE,
- /**
- * Represent a change (addition,replacement,deletion) of the node or one of its direct
- * children.
- *
- * <p>
- * Note that this is done in the <i>binding independent</i> data tree and so the behavior
- * might be counterintuitive when used with <i>binding aware</i> interfaces particularly
- * when it comes to lists. The list itself is a node in the <i>binding independent</i> tree,
- * which means that if you want to listen on new elements you must listen on the list itself
- * with the scope of {@link #ONE}.
- *
- * <p>
- * As an example, in the below YANG snippet, listening on <tt>node</tt> with scope
- * {@link #ONE} would tell you if the <tt>node-connector</tt> list was created or deleted,
- * but not when elements were added or removed from the list assuming the list itself
- * already existed.
- *
- * <pre>
- * container nodes {
- * list node {
- * key "id";
- * leaf id {
- * type string;
- * }
- * list node-connector {
- * leaf id {
- * type string;
- * }
- * }
- * }
- * }
- * </pre>
- *
- * <p>
- * This scope is superset of {@link #BASE}.
- *
- */
- ONE,
- /**
- * Represents a change of the node or any of or any of its child nodes,
- * direct and nested.
- *
- * <p>
- * This scope is superset of {@link #ONE} and {@link #BASE}.
- *
- */
- SUBTREE
- }
+public interface AsyncDataBroker<P extends Path<P>, D> extends AsyncDataTransactionFactory<P, D> {
@Override
AsyncReadOnlyTransaction<P, D> newReadOnlyTransaction();
@Override
AsyncWriteTransaction<P, D> newWriteOnlyTransaction();
-
- /**
- * Registers a {@link AsyncDataChangeListener} to receive
- * notifications when data changes under a given path in the conceptual data
- * tree.
- *
- * <p>
- * You are able to register for notifications for any node or subtree
- * which can be reached via the supplied path.
- *
- * <p>
- * If path type <code>P</code> allows it, you may specify paths up to the leaf nodes
- * then it is possible to listen on leaf nodes.
- *
- * <p>
- * You are able to register for data change notifications for a subtree even
- * if it does not exist. You will receive notification once that node is created.
- *
- * <p>
- * If there is any preexisting data in data tree on path for which you are
- * registering, you will receive initial data change event, which will
- * contain all preexisting data, marked as created.
- *
- * <p>
- * You are also able to specify the scope of the changes you want to be
- * notified.
- *
- * <p>
- * Supported scopes are:
- * <ul>
- * <li>{@link DataChangeScope#BASE} - notification events will only be
- * triggered when a node referenced by path is created, removed or replaced.
- * <li>{@link DataChangeScope#ONE} - notifications events will only be
- * triggered when a node referenced by path is created, removed or replaced,
- * or any or any of its immediate children are created, updated or removed.
- * <li>{@link DataChangeScope#SUBTREE} - notification events will be
- * triggered when a node referenced by the path is created, removed
- * or replaced or any of the children in its subtree are created, removed
- * or replaced.
- * </ul>
- * See {@link DataChangeScope} for examples.
- *
- * <p>
- * This method returns a {@link ListenerRegistration} object. To
- * "unregister" your listener for changes call the "close" method on this
- * returned object.
- *
- * <p>
- * You MUST call close when you no longer need to receive notifications
- * (such as during shutdown or for example if your bundle is shutting down).
- *
- * @param store
- * Logical Data Store - Logical Datastore you want to listen for
- * changes in. For example
- * {@link LogicalDatastoreType#OPERATIONAL} or
- * {@link LogicalDatastoreType#CONFIGURATION}
- * @param path
- * Path (subtree identifier) on which client listener will be
- * invoked.
- * @param listener
- * Instance of listener which should be invoked on
- * @param triggeringScope
- * Scope of change which triggers callback.
- * @return Listener registration object, which may be used to unregister
- * your listener using {@link ListenerRegistration#close()} to stop
- * delivery of change events.
- */
- @Deprecated
- default ListenerRegistration<L> registerDataChangeListener(LogicalDatastoreType store, P path, L listener,
- DataChangeScope triggeringScope) {
- throw new UnsupportedOperationException("Data change listeners are no longer supported.");
- }
}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.common.api.data;
-
-import java.util.Map;
-import java.util.Set;
-import org.opendaylight.yangtools.concepts.Immutable;
-import org.opendaylight.yangtools.concepts.Path;
-
-/**
- * An event which contains a capture of changes in a data subtree.
- *
- * <p>
- * Represents a notification indicating that some data at or under a particular
- * path has changed. The notification contains a capture of the changes in the data
- * subtree. This event is triggered by successful application of modifications
- * from a transaction on the global data tree. Use the
- * {@link AsyncDataBroker#registerDataChangeListener(LogicalDatastoreType, Path, AsyncDataChangeListener,
- * AsyncDataBroker.DataChangeScope)}
- * method to register a listener for data change events.
- *
- * <p>
- * A listener will only receive notifications for changes to data under the path
- * they register for. See
- * {@link AsyncDataBroker#registerDataChangeListener(LogicalDatastoreType, Path, AsyncDataChangeListener,
- * AsyncDataBroker.DataChangeScope)}
- * to learn more about registration scopes.
- *
- * <p>
- * The entire subtree under the path will be provided via instance methods of Data
- * Change Event even if just a leaf node changes.
- *
- * <p>
- * <b>Implementation Note:</b> This interface is not intended to be implemented
- * by users of MD-SAL, but only to be consumed by them.
- *
- * @param <P>
- * Type of path (subtree identifier), which represents location in
- * tree
- * @param <D>
- * Type of data (payload), which represents data payload
- */
-public interface AsyncDataChangeEvent<P extends Path<P>, D> extends Immutable {
- /**
- * Returns a map of paths and newly created objects, which were introduced by
- * this change into conceptual data tree, if no new objects were introduced
- * this map will be empty.
- *
- * <p>
- * This map contains all data tree nodes (and paths to them) which were created
- * and are in the scope of listener registration. The data tree nodes
- * contain their whole subtree with their current state.
- *
- * @return map of paths and newly created objects
- */
- Map<P, D> getCreatedData();
-
- /**
- * Returns a map of paths and objects which were updated by this change in the
- * conceptual data tree if no existing objects were updated
- * this map will be empty.
- *
- * <p>
- * This map contains all data tree nodes (and paths to them) which were updated
- * and are in the scope of listener registration. The data tree nodes
- * contain their whole subtree with their current state.
- *
- * <p>
- * A Node is considered updated if it contents were replaced or one of its
- * children was created, removed or updated.
- *
- * <p>
- * Original state of the updated data tree nodes is in
- * {@link #getOriginalData()} stored with same path.
- *
- * @return map of paths and newly created objects
- */
- Map<P, D> getUpdatedData();
-
- /**
- * Returns an immutable set of removed paths.
- *
- * <p>
- * This set contains the paths to the data tree nodes which are in the scope
- * of the listener registration that have been removed.
- *
- * <p>
- * Original state of the removed data tree nodes is in
- * {@link #getOriginalData()} stored with same path.
- *
- * @return set of removed paths
- */
- Set<P> getRemovedPaths();
-
- /**
- * Returns an immutable map of updated or removed paths and their original
- * states prior to this change.
- *
- *<p>
- * This map contains the original version of the data tree nodes (and paths
- * to them), which are in the scope of the listener registration.
- *
- * @return map of paths and original state of updated and removed objects.
- */
- Map<P, D> getOriginalData();
-
- /**
- * Returns an immutable stable view of data state, which captures the state of
- * data store before the reported change.
- *
- *<p>
- * The view is rooted at the point where the listener, to which the event is
- * being delivered, was registered.
- *
- * <p>
- * If listener used a wildcarded path (if supported by path type) during
- * registration for change listeners this method returns null, and original
- * state can be accessed only via {@link #getOriginalData()}
- *
- * @return Stable view of data before the change happened, rooted at the
- * listener registration path.
- *
- */
- D getOriginalSubtree();
-
- /**
- * Returns an immutable stable view of data, which captures the state of data
- * store after the reported change.
- *
- * <p>
- * The view is rooted at the point where the listener, to which the event is
- * being delivered, was registered.
- *
- * <p>
- * If listener used a wildcarded path (if supported by path type) during
- * registration for change listeners this method returns null, and state
- * can be accessed only via {@link #getCreatedData()},
- * {@link #getUpdatedData()}, {@link #getRemovedPaths()}
- *
- * @return Stable view of data after the change happened, rooted at the
- * listener registration path.
- */
- D getUpdatedSubtree();
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.common.api.data;
-
-import java.util.EventListener;
-import org.opendaylight.yangtools.concepts.Path;
-
-/**
- * Listener of data change events on particular subtree.
- *
- * <p>
- * User-supplied implementations of this listener interface MUST register via
- * {@link AsyncDataBroker#registerDataChangeListener(LogicalDatastoreType, Path, AsyncDataChangeListener,
- * AsyncDataBroker.DataChangeScope)}
- * in order to start receiving data change events, which capture state changes
- * in a subtree.
- *
- * <p>
- * <b>Implementation Note:</b> This interface is intended to be implemented
- * by users of MD-SAL.
- *
- * @param <P>
- * Type of path (subtree identifier), which represents location in
- * tree
- * @param <D>
- * Type of data (payload), which represents data payload
- */
-@Deprecated
-public interface AsyncDataChangeListener<P extends Path<P>, D> extends EventListener {
- /**
- * Invoked when there is data change for the particular path, which was used to
- * register this listener.
- *
- * <p>
- * This method may be also invoked during registration of the listener if
- * there is any preexisting data in the conceptual data tree for supplied path.
- * This initial event will contain all preexisting data as created.
- *
- * <p>
- * <b>Note</b>: This method may be invoked from a shared thread pool.
- * <ul>
- * <li>Implementations <b>SHOULD NOT</b> perform CPU-intensive operations on the calling thread.
- * <li>Implementations <b>MUST NOT block the calling thread</b> - to do so could lead to deadlock
- * </ul>
- * scenarios.
- *
- *<br>
- *
- * @param change
- * Data Change Event being delivered.
- */
- void onDataChanged(AsyncDataChangeEvent<P, D> change);
-}
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.common.api.data;
-
-import java.util.Map;
-import java.util.Set;
-import org.opendaylight.yangtools.concepts.Path;
-
-public interface DataChange<P extends Path<P>, D> {
-
- /**
- * Returns a map of paths and newly created objects.
- *
- * @return map of paths and newly created objects
- */
- Map<P, D> getCreatedOperationalData();
-
- /**
- * Returns a map of paths and newly created objects.
- *
- * @return map of paths and newly created objects
- */
- Map<P, D> getCreatedConfigurationData();
-
- /**
- * Returns a map of paths and respective updated objects after update.
- *
- * <p>
- * Original state of the object is in
- * {@link #getOriginalOperationalData()}
- *
- * @return map of paths and newly created objects
- */
- Map<P, D> getUpdatedOperationalData();
-
- /**
- * Returns a map of paths and respective updated objects after update.
- *
- * <p>
- * Original state of the object is in
- * {@link #getOriginalConfigurationData()}
- *
- * @return map of paths and newly created objects
- */
- Map<P, D> getUpdatedConfigurationData();
-
-
-
- /**
- * Returns a set of paths of removed objects.
- *
- * <p>
- * Original state of the object is in
- * {@link #getOriginalConfigurationData()}
- *
- * @return map of paths and newly created objects
- */
- Set<P> getRemovedConfigurationData();
-
- /**
- * Returns a set of paths of removed objects.
- *
- * <p>
- * Original state of the object is in
- * {@link #getOriginalOperationalData()}
- *
- * @return map of paths and newly created objects
- */
- Set<P> getRemovedOperationalData();
-
- /**
- * Return a map of paths and original state of updated and removed objectd.
- *
- * @return map of paths and original state of updated and removed objectd.
- */
- Map<P, D> getOriginalConfigurationData();
-
- /**
- * Return a map of paths and original state of updated and removed objectd.
- *
- * @return map of paths and original state of updated and removed objectd.
- */
- Map<P, D> getOriginalOperationalData();
-}
*/
package org.opendaylight.controller.cluster.databroker.compat;
-import static com.google.common.base.Preconditions.checkState;
-
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ForwardingObject;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.databroker.AbstractDOMBroker;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
import org.opendaylight.controller.cluster.datastore.compat.LegacyDOMStoreAdapter;
import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataTreeChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
return legacyChain.get();
}
- @Override
- public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(final LogicalDatastoreType store,
- final YangInstanceIdentifier path, final DOMDataChangeListener listener,
- final DataChangeScope triggeringScope) {
- org.opendaylight.mdsal.dom.spi.store.DOMStore potentialStore = delegate().getTxFactories().get(convert(store));
- checkState(potentialStore != null, "Requested logical data store is not available.");
- checkState(potentialStore instanceof DistributedDataStoreInterface,
- "Requested logical data store does not support DataChangeListener.");
- return ((DistributedDataStoreInterface)potentialStore).registerChangeListener(path, listener, triggeringScope);
- }
-
private static org.opendaylight.mdsal.common.api.LogicalDatastoreType convert(LogicalDatastoreType datastoreType) {
return org.opendaylight.mdsal.common.api.LogicalDatastoreType.valueOf(datastoreType.name());
}
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener;
import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
import org.slf4j.Logger;
this.closeable = closeable;
}
- @SuppressWarnings("unchecked")
- @Override
- @Deprecated
- public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
- ListenerRegistration<L> registerChangeListener(
- final YangInstanceIdentifier path, final L listener,
- final AsyncDataBroker.DataChangeScope scope) {
-
- Preconditions.checkNotNull(path, "path should not be null");
- Preconditions.checkNotNull(listener, "listener should not be null");
-
- LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope);
-
- String shardName = actorContext.getShardStrategyFactory().getStrategy(path).findShard(path);
-
- final DataChangeListenerRegistrationProxy listenerRegistrationProxy =
- new DataChangeListenerRegistrationProxy(shardName, actorContext, listener);
- listenerRegistrationProxy.init(path, scope);
-
- return listenerRegistrationProxy;
- }
-
@Override
public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
final YangInstanceIdentifier treeId, final L listener) {
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.Props;
-import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
-import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
-import org.opendaylight.controller.cluster.datastore.messages.DataTreeListenerInfo;
-import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
-import org.opendaylight.controller.cluster.datastore.messages.GetInfo;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-/**
- * Actor for a DataChangeListener.
- *
- * @deprecated Replaced by {@link DataTreeChangeListener}
- */
-@Deprecated
-public class DataChangeListener extends AbstractUntypedActor {
- private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
- private final YangInstanceIdentifier registeredPath;
- private boolean notificationsEnabled = false;
- private long notificationCount;
-
- public DataChangeListener(AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
- final YangInstanceIdentifier registeredPath) {
- this.listener = Preconditions.checkNotNull(listener, "listener should not be null");
- this.registeredPath = Preconditions.checkNotNull(registeredPath);
- }
-
- @Override
- public void handleReceive(Object message) {
- if (message instanceof DataChanged) {
- dataChanged(message);
- } else if (message instanceof EnableNotification) {
- enableNotification((EnableNotification) message);
- } else if (message instanceof GetInfo) {
- getSender().tell(new DataTreeListenerInfo(listener.toString(), registeredPath.toString(),
- notificationsEnabled, notificationCount), getSelf());
- } else {
- unknownMessage(message);
- }
- }
-
- private void enableNotification(EnableNotification message) {
- notificationsEnabled = message.isEnabled();
- LOG.debug("{} notifications for listener {}", notificationsEnabled ? "Enabled" : "Disabled",
- listener);
- }
-
- @SuppressWarnings("checkstyle:IllegalCatch")
- private void dataChanged(Object message) {
-
- // Do nothing if notifications are not enabled
- if (!notificationsEnabled) {
- LOG.debug("Notifications not enabled for listener {} - dropping change notification", listener);
- return;
- }
-
- DataChanged reply = (DataChanged) message;
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = reply.getChange();
-
- LOG.debug("Sending change notification {} to listener {}", change, listener);
-
- notificationCount++;
-
- try {
- this.listener.onDataChanged(change);
- } catch (RuntimeException e) {
- LOG.error(String.format("Error notifying listener %s", this.listener), e);
- }
-
- if (isValidSender(getSender())) {
- getSender().tell(DataChangedReply.INSTANCE, getSelf());
- }
- }
-
- public static Props props(final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
- final YangInstanceIdentifier registeredPath) {
- return Props.create(DataChangeListener.class, listener, registeredPath);
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-/**
- * DataChangeListenerProxy represents a single remote DataChangeListener.
- */
-public class DataChangeListenerProxy implements AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> {
- private final ActorSelection dataChangeListenerActor;
-
- public DataChangeListenerProxy(ActorSelection dataChangeListenerActor) {
- this.dataChangeListenerActor = Preconditions.checkNotNull(dataChangeListenerActor,
- "dataChangeListenerActor should not be null");
- }
-
- @Override
- public void onDataChanged(
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
- dataChangeListenerActor.tell(new DataChanged(change), ActorRef.noSender());
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.PoisonPill;
-import akka.dispatch.OnComplete;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-/**
- * ListenerRegistrationProxy acts as a proxy for a ListenerRegistration that was done on a remote shard.
- *
- * <p>
- * Registering a DataChangeListener on the Data Store creates a new instance of the ListenerRegistrationProxy
- * The ListenerRegistrationProxy talks to a remote ListenerRegistration actor.
- */
-@SuppressWarnings("rawtypes")
-public class DataChangeListenerRegistrationProxy implements ListenerRegistration {
-
- private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerRegistrationProxy.class);
-
- private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
- private final String shardName;
- private final ActorContext actorContext;
- private ActorRef dataChangeListenerActor;
- private volatile ActorSelection listenerRegistrationActor;
- private boolean closed = false;
-
- public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
- DataChangeListenerRegistrationProxy(final String shardName, final ActorContext actorContext,
- final L listener) {
- this.shardName = Preconditions.checkNotNull(shardName);
- this.actorContext = Preconditions.checkNotNull(actorContext);
- this.listener = Preconditions.checkNotNull(listener);
- }
-
- @VisibleForTesting
- ActorSelection getListenerRegistrationActor() {
- return listenerRegistrationActor;
- }
-
- @VisibleForTesting
- ActorRef getDataChangeListenerActor() {
- return dataChangeListenerActor;
- }
-
- @Override
- public Object getInstance() {
- return listener;
- }
-
- private void setListenerRegistrationActor(final ActorSelection listenerRegistrationActor) {
- if (listenerRegistrationActor == null) {
- return;
- }
-
- boolean sendCloseMessage = false;
- synchronized (this) {
- if (closed) {
- sendCloseMessage = true;
- } else {
- this.listenerRegistrationActor = listenerRegistrationActor;
- }
- }
-
- if (sendCloseMessage) {
- listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(),
- ActorRef.noSender());
- }
- }
-
- public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
-
- dataChangeListenerActor = actorContext.getActorSystem().actorOf(
- DataChangeListener.props(listener, path).withDispatcher(actorContext.getNotificationDispatcherPath()));
-
- Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
- findFuture.onComplete(new OnComplete<ActorRef>() {
- @Override
- public void onComplete(final Throwable failure, final ActorRef shard) {
- if (failure instanceof LocalShardNotFoundException) {
- LOG.debug("No local shard found for {} - DataChangeListener {} at path {} "
- + "cannot be registered", shardName, listener, path);
- } else if (failure != null) {
- LOG.error("Failed to find local shard {} - DataChangeListener {} at path {} "
- + "cannot be registered: {}", shardName, listener, path, failure);
- } else {
- doRegistration(shard, path, scope);
- }
- }
- }, actorContext.getClientDispatcher());
- }
-
- private void doRegistration(final ActorRef shard, final YangInstanceIdentifier path,
- final DataChangeScope scope) {
-
- Future<Object> future = actorContext.executeOperationAsync(shard,
- new RegisterChangeListener(path, dataChangeListenerActor, scope,
- listener instanceof ClusteredDOMDataChangeListener),
- actorContext.getDatastoreContext().getShardInitializationTimeout());
-
- future.onComplete(new OnComplete<Object>() {
- @Override
- public void onComplete(final Throwable failure, final Object result) {
- if (failure != null) {
- LOG.error("Failed to register DataChangeListener {} at path {}",
- listener, path.toString(), failure);
- } else {
- RegisterDataTreeNotificationListenerReply reply = (RegisterDataTreeNotificationListenerReply)result;
- setListenerRegistrationActor(actorContext.actorSelection(
- reply.getListenerRegistrationPath()));
- }
- }
- }, actorContext.getClientDispatcher());
- }
-
- @Override
- public void close() {
-
- boolean sendCloseMessage;
- synchronized (this) {
- sendCloseMessage = !closed && listenerRegistrationActor != null;
- closed = true;
- }
-
- if (sendCloseMessage) {
- listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(),
- ActorRef.noSender());
- listenerRegistrationActor = null;
- }
-
- if (dataChangeListenerActor != null) {
- dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
- dataChangeListenerActor = null;
- }
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import org.opendaylight.controller.cluster.datastore.actors.DataTreeNotificationListenerRegistrationActor;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-final class DataChangeListenerSupport extends AbstractDataListenerSupport<
- AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>, RegisterChangeListener> {
-
- DataChangeListenerSupport(final Shard shard) {
- super(shard);
- }
-
- @Override
- void doRegistration(final RegisterChangeListener message, final ActorRef registrationActor) {
- final ActorSelection listenerActor = processListenerRegistrationMessage(message);
-
- AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
- new DataChangeListenerProxy(listenerActor);
-
- log().debug("{}: Registering for path {}", persistenceId(), message.getPath());
-
- final ShardDataTree shardDataTree = getShard().getDataStore();
- shardDataTree.registerDataChangeListener(message.getPath(), listener, message.getScope(),
- shardDataTree.readCurrentData(), registration -> registrationActor.tell(
- new DataTreeNotificationListenerRegistrationActor.SetRegistration(registration, () ->
- removeListenerActor(listenerActor)), ActorRef.noSender()));
- }
-
- @Override
- protected String logName() {
- return "registerDataChangeListener";
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import com.google.common.base.Optional;
-import java.util.function.Consumer;
-import javax.annotation.concurrent.NotThreadSafe;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
-import org.opendaylight.controller.md.sal.dom.store.impl.ResolveDataChangeEventsTask;
-import org.opendaylight.controller.sal.core.compat.DataChangeListenerRegistration;
-import org.opendaylight.controller.sal.core.compat.ListenerTree;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.util.concurrent.NotificationManager;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Default implementation of ShardDataChangeListenerPublisher that directly generates and publishes
- * notifications for DataChangeListeners.
- *
- * @author Thomas Pantelis
- */
-@NotThreadSafe
-final class DefaultShardDataChangeListenerPublisher implements ShardDataChangeListenerPublisher,
- NotificationManager<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> {
- private static final Logger LOG = LoggerFactory.getLogger(DefaultShardDataChangeListenerPublisher.class);
-
- private final ListenerTree dataChangeListenerTree = ListenerTree.create();
- private final String logContext;
-
- DefaultShardDataChangeListenerPublisher(String logContext) {
- this.logContext = logContext;
- }
-
- @Override
- public void submitNotification(final DataChangeListenerRegistration<?> listener,
- final DOMImmutableDataChangeEvent notification) {
- LOG.debug("{}: Notifying listener {} about {}", logContext, listener.getInstance(), notification);
-
- listener.getInstance().onDataChanged(notification);
- }
-
- @Override
- public void submitNotifications(final DataChangeListenerRegistration<?> listener,
- final Iterable<DOMImmutableDataChangeEvent> notifications) {
- final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> instance = listener.getInstance();
- LOG.debug("{}: Notifying listener {} about {}", logContext, instance, notifications);
-
- for (DOMImmutableDataChangeEvent n : notifications) {
- instance.onDataChanged(n);
- }
- }
-
- @Override
- public void publishChanges(DataTreeCandidate candidate) {
- ResolveDataChangeEventsTask.create(candidate, dataChangeListenerTree).resolve(this);
- }
-
- @Override
- public void registerDataChangeListener(YangInstanceIdentifier path,
- AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, DataChangeScope scope,
- Optional<DataTreeCandidate> initialState,
- Consumer<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>>
- onRegistration) {
- final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
- registration = dataChangeListenerTree.registerDataChangeListener(path, listener, scope);
-
- onRegistration.accept(registration);
-
- if (initialState.isPresent()) {
- notifySingleListener(path, listener, scope, initialState.get(), logContext);
- }
- }
-
- static void notifySingleListener(final YangInstanceIdentifier path,
- final AsyncDataChangeListener<YangInstanceIdentifier,NormalizedNode<?, ?>> listener,
- final DataChangeScope scope, final DataTreeCandidate initialState, String logContext) {
- DefaultShardDataChangeListenerPublisher publisher = new DefaultShardDataChangeListenerPublisher(logContext);
- publisher.registerDataChangeListener(path, listener, scope, Optional.absent(), noop -> { /* NOOP */ });
- publisher.publishChanges(initialState);
- }
-}
package org.opendaylight.controller.cluster.datastore;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.mdsal.dom.spi.store.DOMStore;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
/**
* The public interface exposed vi a DistributedDataStore via the OSGi registry.
*/
public interface DistributedDataStoreInterface extends DOMStore {
- @Deprecated
- <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L>
- registerChangeListener(YangInstanceIdentifier path, L listener, DataChangeScope scope);
-
ActorContext getActorContext();
}
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
private final ShardSnapshotCohort snapshotCohort;
private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
- private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
-
private ShardSnapshot restoreFromSnapshot;
ShardDataTreeChangeListenerPublisherActorProxy treeChangeListenerPublisher =
new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher", name);
- ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher =
- new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher", name);
if (builder.getDataTree() != null) {
store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(),
- treeChangeListenerPublisher, dataChangeListenerPublisher, name, frontendMetadata);
+ treeChangeListenerPublisher, name, frontendMetadata);
} else {
store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(),
- builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher,
- dataChangeListenerPublisher, name, frontendMetadata);
+ builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, name, frontendMetadata);
}
shardMBean = ShardMBeanFactory.getShardStatsMBean(name, datastoreContext.getDataStoreMXBeanType(), this);
handleAbortTransaction(AbortTransaction.fromSerializable(message));
} else if (CloseTransactionChain.isSerializedType(message)) {
closeTransactionChain(CloseTransactionChain.fromSerializable(message));
- } else if (message instanceof RegisterChangeListener) {
- changeSupport.onMessage((RegisterChangeListener) message, isLeader(), hasLeader());
} else if (message instanceof RegisterDataTreeChangeListener) {
treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader(), hasLeader());
} else if (message instanceof UpdateSchemaContext) {
protected void onStateChanged() {
boolean isLeader = isLeader();
boolean hasLeader = hasLeader();
- changeSupport.onLeadershipChange(isLeader, hasLeader);
treeChangeSupport.onLeadershipChange(isLeader, hasLeader);
// If this actor is no longer the leader close all the transaction chains
@Override
protected OnDemandRaftState.AbstractBuilder<?, ?> newOnDemandRaftStateBuilder() {
return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
- .dataChangeListenerActors(changeSupport.getListenerActors())
.commitCohortActors(store.getCohortActors());
}
+++ /dev/null
-/*
- * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import com.google.common.base.Optional;
-import java.util.function.Consumer;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-
-/**
- * Interface for a class that generates and publishes notifications for DataChangeListeners.
- *
- * @author Thomas Pantelis
- */
-interface ShardDataChangeListenerPublisher extends ShardDataTreeNotificationPublisher {
- void registerDataChangeListener(YangInstanceIdentifier path,
- AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, DataChangeScope scope,
- Optional<DataTreeCandidate> initialState,
- Consumer<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>>
- onRegistration);
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorContext;
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import com.google.common.base.Optional;
-import java.util.function.Consumer;
-import javax.annotation.concurrent.NotThreadSafe;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-
-/**
- * Implementation of ShardDataChangeListenerPublisher that offloads the generation and publication
- * of data change notifications to an actor.
- *
- * @author Thomas Pantelis
- */
-@NotThreadSafe
-class ShardDataChangeListenerPublisherActorProxy extends AbstractShardDataTreeNotificationPublisherActorProxy
- implements ShardDataChangeListenerPublisher {
-
- ShardDataChangeListenerPublisherActorProxy(ActorContext actorContext, String actorName, String logContext) {
- super(actorContext, actorName, logContext);
- }
-
- @Override
- public void registerDataChangeListener(YangInstanceIdentifier path,
- AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, DataChangeScope scope,
- Optional<DataTreeCandidate> initialState,
- Consumer<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>>
- onRegistration) {
- publisherActor().tell(new ShardDataChangePublisherActor.RegisterListener(path, listener, scope, initialState,
- onRegistration), ActorRef.noSender());
- }
-
- @Override
- protected Props props() {
- return ShardDataChangePublisherActor.props(actorName(), logContext());
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.Props;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import java.util.function.Consumer;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-
-/**
- * Actor used to generate and publish DataChange notifications.
- *
- * @author Thomas Pantelis
- */
-public final class ShardDataChangePublisherActor
- extends ShardDataTreeNotificationPublisherActor<ShardDataChangeListenerPublisher> {
-
- private ShardDataChangePublisherActor(final String name, final String logContext) {
- super(new DefaultShardDataChangeListenerPublisher(logContext), name, logContext);
- }
-
- @Override
- protected void handleReceive(final Object message) {
- if (message instanceof RegisterListener) {
- RegisterListener reg = (RegisterListener)message;
- if (reg.initialState.isPresent()) {
- DefaultShardDataChangeListenerPublisher.notifySingleListener(reg.path, reg.listener, reg.scope,
- reg.initialState.get(), logContext());
- }
-
- publisher().registerDataChangeListener(reg.path, reg.listener, reg.scope, Optional.absent(),
- reg.onRegistration);
- } else {
- super.handleReceive(message);
- }
- }
-
- static Props props(final String name, final String logContext) {
- return Props.create(ShardDataChangePublisherActor.class, name, logContext);
- }
-
- static class RegisterListener {
- private final YangInstanceIdentifier path;
- private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
- private final DataChangeScope scope;
- private final Optional<DataTreeCandidate> initialState;
- private final Consumer<ListenerRegistration<
- AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>> onRegistration;
-
- RegisterListener(final YangInstanceIdentifier path,
- final AsyncDataChangeListener<YangInstanceIdentifier,NormalizedNode<?, ?>> listener,
- final DataChangeScope scope, final Optional<DataTreeCandidate> initialState,
- final Consumer<ListenerRegistration<
- AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>> onRegistration) {
- this.path = Preconditions.checkNotNull(path);
- this.listener = Preconditions.checkNotNull(listener);
- this.scope = Preconditions.checkNotNull(scope);
- this.initialState = Preconditions.checkNotNull(initialState);
- this.onRegistration = Preconditions.checkNotNull(onRegistration);
- }
- }
-}
import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
private final Map<Payload, Runnable> replicationCallbacks = new HashMap<>();
private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher;
- private final ShardDataChangeListenerPublisher dataChangeListenerPublisher;
private final Collection<ShardDataTreeMetadata<?>> metadata;
private final DataTree dataTree;
private final String logContext;
ShardDataTree(final Shard shard, final SchemaContext schemaContext, final DataTree dataTree,
final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
- final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
+ final String logContext,
final ShardDataTreeMetadata<?>... metadata) {
this.dataTree = Preconditions.checkNotNull(dataTree);
updateSchemaContext(schemaContext);
this.shard = Preconditions.checkNotNull(shard);
this.treeChangeListenerPublisher = Preconditions.checkNotNull(treeChangeListenerPublisher);
- this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher);
this.logContext = Preconditions.checkNotNull(logContext);
this.metadata = ImmutableList.copyOf(metadata);
tip = dataTree;
ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
final YangInstanceIdentifier root,
final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
- final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
+ final String logContext,
final ShardDataTreeMetadata<?>... metadata) {
- this(shard, schemaContext, createDataTree(treeType, root), treeChangeListenerPublisher,
- dataChangeListenerPublisher, logContext, metadata);
+ this(shard, schemaContext, createDataTree(treeType, root), treeChangeListenerPublisher, logContext, metadata);
}
private static DataTree createDataTree(final TreeType treeType, final YangInstanceIdentifier root) {
@VisibleForTesting
public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY,
- new DefaultShardDataTreeChangeListenerPublisher(""),
- new DefaultShardDataChangeListenerPublisher(""), "");
+ new DefaultShardDataTreeChangeListenerPublisher(""), "");
}
final String logContext() {
@VisibleForTesting
public void notifyListeners(final DataTreeCandidate candidate) {
treeChangeListenerPublisher.publishChanges(candidate);
- dataChangeListenerPublisher.publishChanges(candidate);
}
/**
replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback);
}
- void registerDataChangeListener(final YangInstanceIdentifier path,
- final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
- final DataChangeScope scope, final Optional<DataTreeCandidate> initialState,
- final Consumer<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>>
- onRegistration) {
- dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope, initialState, onRegistration);
- }
-
Optional<DataTreeCandidate> readCurrentData() {
final java.util.Optional<NormalizedNode<?, ?>> currentState =
dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
import com.google.common.util.concurrent.ListenableFuture;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.mdsal.common.api.MappingCheckedFuture;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.util.concurrent.ExceptionMapper;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
return new DOMStoreTransactionAdapter(delegate().newReadWriteTransaction());
}
- @Override
- public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L>
- registerChangeListener(YangInstanceIdentifier path, L listener, DataChangeScope scope) {
- return delegate().registerChangeListener(path, listener, scope);
- }
-
@Override
public DOMStoreTransactionChain createTransactionChain() {
final org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain delegateChain =
*/
public interface ShardDataTreeListenerInfoMXBean {
List<DataTreeListenerInfo> getDataTreeChangeListenerInfo();
-
- @Deprecated
- List<DataTreeListenerInfo> getDataChangeListenerInfo();
}
return getListenerActorsInfo(getState().getTreeChangeListenerActors());
}
- @Override
- public List<DataTreeListenerInfo> getDataChangeListenerInfo() {
- return getListenerActorsInfo(getState().getDataChangeListenerActors());
- }
-
@SuppressWarnings("checkstyle:IllegalCatch")
private OnDemandShardState getState() {
try {
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore.messages;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Map;
-import java.util.Set;
-import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
-import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput;
-import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput;
-import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
-import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-public class DataChanged implements Externalizable {
- private static final long serialVersionUID = 1L;
-
- private AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
-
- public DataChanged() {
- }
-
- public DataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
- this.change = change;
- }
-
- public AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> getChange() {
- return change;
- }
-
- @Override
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- // Read the version
- in.readShort();
-
- NormalizedNodeDataInput streamReader = NormalizedNodeInputOutput.newDataInputWithoutValidation(in);
-
- // Note: the scope passed to builder is not actually used.
- Builder builder = DOMImmutableDataChangeEvent.builder(DataChangeScope.SUBTREE);
-
- // Read created data
-
- int size = in.readInt();
- for (int i = 0; i < size; i++) {
- YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
- NormalizedNode<?, ?> node = streamReader.readNormalizedNode();
- builder.addCreated(path, node);
- }
-
- // Read updated data
-
- size = in.readInt();
- for (int i = 0; i < size; i++) {
- YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
- NormalizedNode<?, ?> before = streamReader.readNormalizedNode();
- NormalizedNode<?, ?> after = streamReader.readNormalizedNode();
- builder.addUpdated(path, before, after);
- }
-
- // Read removed data
-
- size = in.readInt();
- for (int i = 0; i < size; i++) {
- YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
- NormalizedNode<?, ?> node = streamReader.readNormalizedNode();
- builder.addRemoved(path, node);
- }
-
- // Read original subtree
-
- boolean present = in.readBoolean();
- if (present) {
- builder.setBefore(streamReader.readNormalizedNode());
- }
-
- // Read updated subtree
-
- present = in.readBoolean();
- if (present) {
- builder.setAfter(streamReader.readNormalizedNode());
- }
-
- change = builder.build();
- }
-
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeShort(DataStoreVersions.CURRENT_VERSION);
-
- NormalizedNodeDataOutput streamWriter = NormalizedNodeInputOutput.newDataOutput(out);
-
- // Write created data
-
- Map<YangInstanceIdentifier, NormalizedNode<?, ?>> createdData = change.getCreatedData();
- out.writeInt(createdData.size());
- for (Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> e: createdData.entrySet()) {
- streamWriter.writeYangInstanceIdentifier(e.getKey());
- streamWriter.writeNormalizedNode(e.getValue());
- }
-
- // Write updated data
-
- Map<YangInstanceIdentifier, NormalizedNode<?, ?>> originalData = change.getOriginalData();
- Map<YangInstanceIdentifier, NormalizedNode<?, ?>> updatedData = change.getUpdatedData();
- out.writeInt(updatedData.size());
- for (Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> e: updatedData.entrySet()) {
- streamWriter.writeYangInstanceIdentifier(e.getKey());
- streamWriter.writeNormalizedNode(originalData.get(e.getKey()));
- streamWriter.writeNormalizedNode(e.getValue());
- }
-
- // Write removed data
-
- Set<YangInstanceIdentifier> removed = change.getRemovedPaths();
- out.writeInt(removed.size());
- for (YangInstanceIdentifier path: removed) {
- streamWriter.writeYangInstanceIdentifier(path);
- streamWriter.writeNormalizedNode(originalData.get(path));
- }
-
- // Write original subtree
-
- NormalizedNode<?, ?> originalSubtree = change.getOriginalSubtree();
- out.writeBoolean(originalSubtree != null);
- if (originalSubtree != null) {
- streamWriter.writeNormalizedNode(originalSubtree);
- }
-
- // Write original subtree
-
- NormalizedNode<?, ?> updatedSubtree = change.getUpdatedSubtree();
- out.writeBoolean(updatedSubtree != null);
- if (updatedSubtree != null) {
- streamWriter.writeNormalizedNode(updatedSubtree);
- }
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.messages;
-
-public final class DataChangedReply {
- public static final DataChangedReply INSTANCE = new DataChangedReply();
-
- private DataChangedReply() {
- }
-}
*/
public class OnDemandShardState extends OnDemandRaftState {
private Collection<ActorSelection> treeChangeListenerActors;
- private Collection<ActorSelection> dataChangeListenerActors;
private Collection<ActorRef> commitCohortActors;
public Collection<ActorSelection> getTreeChangeListenerActors() {
return treeChangeListenerActors;
}
- public Collection<ActorSelection> getDataChangeListenerActors() {
- return dataChangeListenerActors;
- }
-
public Collection<ActorRef> getCommitCohortActors() {
return commitCohortActors;
}
return self();
}
- public Builder dataChangeListenerActors(Collection<ActorSelection> actors) {
- state.dataChangeListenerActors = actors;
- return self();
- }
-
public Builder commitCohortActors(Collection<ActorRef> actors) {
state.commitCohortActors = actors;
return self();
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore.messages;
-
-import akka.actor.ActorPath;
-import akka.actor.ActorRef;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
-public class RegisterChangeListener implements ListenerRegistrationMessage {
- private final YangInstanceIdentifier path;
- private final ActorRef dataChangeListenerActor;
- private final AsyncDataBroker.DataChangeScope scope;
- private final boolean registerOnAllInstances;
-
- public RegisterChangeListener(YangInstanceIdentifier path, ActorRef dataChangeListenerActor,
- AsyncDataBroker.DataChangeScope scope, boolean registerOnAllInstances) {
- this.path = path;
- this.dataChangeListenerActor = dataChangeListenerActor;
- this.scope = scope;
- this.registerOnAllInstances = registerOnAllInstances;
- }
-
- @Override
- public YangInstanceIdentifier getPath() {
- return path;
- }
-
- public AsyncDataBroker.DataChangeScope getScope() {
- return scope;
- }
-
- @Override
- public ActorPath getListenerActorPath() {
- return dataChangeListenerActor.path();
- }
-
- @Override
- public boolean isRegisterOnAllInstances() {
- return registerOnAllInstances;
- }
-
- @Override
- public String toString() {
- return "RegisterChangeListener [path=" + path + ", scope=" + scope + ", registerOnAllInstances="
- + registerOnAllInstances + ", dataChangeListenerActor=" + dataChangeListenerActor + "]";
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.junit.Assert;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
-import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
-import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
-import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import org.opendaylight.controller.md.cluster.datastore.model.CompositeModel;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-public class DataChangeListenerProxyTest extends AbstractActorTest {
-
- private static class MockDataChangedEvent
- implements AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> {
- Map<YangInstanceIdentifier,NormalizedNode<?,?>> createdData = new HashMap<>();
- Map<YangInstanceIdentifier,NormalizedNode<?,?>> updatedData = new HashMap<>();
- Map<YangInstanceIdentifier,NormalizedNode<?,?>> originalData = new HashMap<>();
-
- @Override
- public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getCreatedData() {
- createdData.put(YangInstanceIdentifier.EMPTY, CompositeModel.createDocumentOne(
- CompositeModel.createTestContext()));
- return createdData;
- }
-
- @Override
- public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getUpdatedData() {
- updatedData.put(YangInstanceIdentifier.EMPTY, CompositeModel.createTestContainer());
- return updatedData;
-
- }
-
- @Override
- public Set<YangInstanceIdentifier> getRemovedPaths() {
- Set<YangInstanceIdentifier> ids = new HashSet<>();
- ids.add(CompositeModel.TEST_PATH);
- return ids;
- }
-
- @Override
- public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getOriginalData() {
- originalData.put(YangInstanceIdentifier.EMPTY, CompositeModel.createFamily());
- return originalData;
- }
-
- @Override public NormalizedNode<?, ?> getOriginalSubtree() {
- return CompositeModel.createFamily() ;
- }
-
- @Override public NormalizedNode<?, ?> getUpdatedSubtree() {
- return CompositeModel.createTestContainer();
- }
- }
-
-
- @Test
- public void testOnDataChanged() throws Exception {
- final ActorRef actorRef = getSystem().actorOf(MessageCollectorActor.props());
-
- DataChangeListenerProxy dataChangeListenerProxy = new DataChangeListenerProxy(
- getSystem().actorSelection(actorRef.path()));
-
- dataChangeListenerProxy.onDataChanged(new MockDataChangedEvent());
-
- //Check if it was received by the remote actor
- ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(
- Props.create(DoNothingActor.class)), new MockClusterWrapper(), new MockConfiguration());
- Object messages = testContext
- .executeOperation(actorRef, MessageCollectorActor.GET_ALL_MESSAGES);
-
- Assert.assertNotNull(messages);
-
- Assert.assertTrue(messages instanceof List);
-
- List<?> listMessages = (List<?>) messages;
-
- Assert.assertEquals(1, listMessages.size());
-
- Assert.assertTrue(listMessages.get(0).getClass().equals(DataChanged.class));
-
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.Terminated;
-import akka.dispatch.ExecutionContexts;
-import akka.dispatch.Futures;
-import akka.testkit.javadsl.TestKit;
-import akka.util.Timeout;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.Uninterruptibles;
-import java.util.concurrent.TimeUnit;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.mockito.stubbing.Answer;
-import org.opendaylight.controller.cluster.common.actor.Dispatchers;
-import org.opendaylight.controller.cluster.datastore.config.Configuration;
-import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
-import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
-import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
-import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
-import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataChangeListener;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import scala.concurrent.ExecutionContextExecutor;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Unit tests for DataChangeListenerRegistrationProxy.
- *
- * @author Thomas Pantelis
- */
-public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
-
- @SuppressWarnings("unchecked")
- private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> mockListener =
- Mockito.mock(AsyncDataChangeListener.class);
-
- @Test
- public void testGetInstance() throws Exception {
- try (DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
- "shard", Mockito.mock(ActorContext.class), mockListener)) {
- Assert.assertEquals(mockListener, proxy.getInstance());
- }
- }
-
- @Test(timeout = 10000)
- public void testSuccessfulRegistration() {
- new TestKit(getSystem()) {
- {
- ActorContext actorContext = new ActorContext(getSystem(), getRef(),
- mock(ClusterWrapper.class), mock(Configuration.class));
-
- final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
- "shard-1", actorContext, mockListener);
-
- final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
- final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
- new Thread(() -> proxy.init(path, scope)).start();
-
- FiniteDuration timeout = duration("5 seconds");
- FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
- Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
-
- reply(new LocalShardFound(getRef()));
-
- RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class);
- Assert.assertEquals("getPath", path, registerMsg.getPath());
- Assert.assertEquals("getScope", scope, registerMsg.getScope());
- Assert.assertEquals("isRegisterOnAllInstances", false, registerMsg.isRegisterOnAllInstances());
-
- reply(new RegisterDataTreeNotificationListenerReply(getRef()));
-
- for (int i = 0; i < 20 * 5 && proxy.getListenerRegistrationActor() == null; i++) {
- Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
- }
-
- Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()),
- proxy.getListenerRegistrationActor());
-
- watch(proxy.getDataChangeListenerActor());
-
- proxy.close();
-
- // The listener registration actor should get a Close message
- expectMsgClass(timeout, CloseDataTreeNotificationListenerRegistration.class);
-
- // The DataChangeListener actor should be terminated
- expectMsgClass(timeout, Terminated.class);
-
- proxy.close();
-
- expectNoMsg();
- }
- };
- }
-
- @Test(timeout = 10000)
- public void testSuccessfulRegistrationForClusteredListener() {
- new TestKit(getSystem()) {
- {
- ActorContext actorContext = new ActorContext(getSystem(), getRef(),
- mock(ClusterWrapper.class), mock(Configuration.class));
-
- AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> mockClusteredListener =
- Mockito.mock(ClusteredDOMDataChangeListener.class);
-
- final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
- "shard-1", actorContext, mockClusteredListener);
-
- final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
- final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
- new Thread(() -> proxy.init(path, scope)).start();
-
- FiniteDuration timeout = duration("5 seconds");
- FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
- Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
-
- reply(new LocalShardFound(getRef()));
-
- RegisterChangeListener registerMsg = expectMsgClass(timeout, RegisterChangeListener.class);
- Assert.assertEquals("getPath", path, registerMsg.getPath());
- Assert.assertEquals("getScope", scope, registerMsg.getScope());
- Assert.assertEquals("isRegisterOnAllInstances", true, registerMsg.isRegisterOnAllInstances());
-
- reply(new RegisterDataTreeNotificationListenerReply(getRef()));
-
- for (int i = 0; i < 20 * 5 && proxy.getListenerRegistrationActor() == null; i++) {
- Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
- }
-
- Assert.assertEquals("getListenerRegistrationActor", getSystem().actorSelection(getRef().path()),
- proxy.getListenerRegistrationActor());
-
- watch(proxy.getDataChangeListenerActor());
-
- proxy.close();
-
- // The listener registration actor should get a Close message
- expectMsgClass(timeout, CloseDataTreeNotificationListenerRegistration.class);
-
- // The DataChangeListener actor should be terminated
- expectMsgClass(timeout, Terminated.class);
-
- proxy.close();
-
- expectNoMsg();
- }
- };
- }
-
- @Test(timeout = 10000)
- public void testLocalShardNotFound() {
- new TestKit(getSystem()) {
- {
- ActorContext actorContext = new ActorContext(getSystem(), getRef(),
- mock(ClusterWrapper.class), mock(Configuration.class));
-
- final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
- "shard-1", actorContext, mockListener);
-
- final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
- final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
- new Thread(() -> proxy.init(path, scope)).start();
-
- FiniteDuration timeout = duration("5 seconds");
- FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
- Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
-
- reply(new LocalShardNotFound("shard-1"));
-
- expectNoMsg(duration("1 seconds"));
-
- proxy.close();
- }
- };
- }
-
- @Test(timeout = 10000)
- public void testLocalShardNotInitialized() {
- new TestKit(getSystem()) {
- {
- ActorContext actorContext = new ActorContext(getSystem(), getRef(),
- mock(ClusterWrapper.class), mock(Configuration.class));
-
- final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
- "shard-1", actorContext, mockListener);
-
- final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
- final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
- new Thread(() -> proxy.init(path, scope)).start();
-
- FiniteDuration timeout = duration("5 seconds");
- FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
- Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
-
- reply(new NotInitializedException("not initialized"));
-
- expectNoMsg(duration("1 seconds"));
- proxy.close();
- }
- };
- }
-
- @Test
- public void testFailedRegistration() {
- new TestKit(getSystem()) {
- {
- ActorSystem mockActorSystem = mock(ActorSystem.class);
-
- ActorRef mockActor = getSystem().actorOf(Props.create(DoNothingActor.class),
- "testFailedRegistration");
- doReturn(mockActor).when(mockActorSystem).actorOf(any(Props.class));
- ExecutionContextExecutor executor = ExecutionContexts.fromExecutor(
- MoreExecutors.directExecutor());
-
-
- ActorContext actorContext = mock(ActorContext.class);
-
- doReturn(executor).when(actorContext).getClientDispatcher();
-
- String shardName = "shard-1";
- final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
- shardName, actorContext, mockListener);
-
- doReturn(mockActorSystem).when(actorContext).getActorSystem();
- doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
- doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
- doReturn(Futures.failed(new RuntimeException("mock")))
- .when(actorContext).executeOperationAsync(any(ActorRef.class),
- any(Object.class), any(Timeout.class));
- doReturn(mock(DatastoreContext.class)).when(actorContext).getDatastoreContext();
-
- proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
- AsyncDataBroker.DataChangeScope.ONE);
-
- Assert.assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor());
-
- proxy.close();
- }
- };
- }
-
- @Test
- public void testCloseBeforeRegistration() {
- new TestKit(getSystem()) {
- {
- ActorContext actorContext = mock(ActorContext.class);
-
- String shardName = "shard-1";
- final DataChangeListenerRegistrationProxy proxy = new DataChangeListenerRegistrationProxy(
- shardName, actorContext, mockListener);
-
- doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
- doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
- doReturn(getSystem()).when(actorContext).getActorSystem();
- doReturn(Dispatchers.DEFAULT_DISPATCHER_PATH).when(actorContext).getNotificationDispatcherPath();
- doReturn(getSystem().actorSelection(getRef().path()))
- .when(actorContext).actorSelection(getRef().path());
- doReturn(duration("5 seconds")).when(actorContext).getOperationDuration();
- doReturn(Futures.successful(getRef())).when(actorContext).findLocalShardAsync(eq(shardName));
-
- Answer<Future<Object>> answer = invocation -> {
- proxy.close();
- return Futures.successful((Object)new RegisterDataTreeNotificationListenerReply(getRef()));
- };
-
- doAnswer(answer).when(actorContext).executeOperationAsync(any(ActorRef.class),
- any(Object.class), any(Timeout.class));
-
- proxy.init(YangInstanceIdentifier.of(TestModel.TEST_QNAME),
- AsyncDataBroker.DataChangeScope.ONE);
-
- expectMsgClass(duration("5 seconds"), CloseDataTreeNotificationListenerRegistration.class);
-
- Assert.assertEquals("getListenerRegistrationActor", null, proxy.getListenerRegistrationActor());
- proxy.close();
- }
- };
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.assertEquals;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.INNER_LIST_QNAME;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.OUTER_CONTAINER_PATH;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.OUTER_CONTAINER_QNAME;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.OUTER_LIST_PATH;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.OUTER_LIST_QNAME;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerEntryPath;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerNode;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerEntryKey;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerEntryPath;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerNode;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerNodeEntry;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.testNodeWithOuter;
-
-import akka.actor.ActorRef;
-import akka.dispatch.Dispatchers;
-import akka.testkit.TestActorRef;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
-import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodes;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-
-/**
- * Unit tests for DataChangeListenerSupport.
- *
- * @author Thomas Pantelis
- */
-public class DataChangeListenerSupportTest extends AbstractShardTest {
-
- @Test
- public void testChangeListenerWithNoInitialData() throws Exception {
-
- new ShardTestKit(getSystem()) {
- {
- final TestActorRef<Shard> actor = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testChangeListenerWithNoInitialData");
-
- waitUntilLeader(actor);
-
- final Shard shard = actor.underlyingActor();
- final MockDataChangeListener listener = new MockDataChangeListener(0);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, TEST_PATH),
- "testChangeListenerWithNoInitialData-DataChangeListener");
- final DataChangeListenerSupport support = new DataChangeListenerSupport(shard);
- support.onMessage(new RegisterChangeListener(TEST_PATH, dclActor, DataChangeScope.ONE, false),
- true,true);
-
- listener.expectNoMoreChanges("Unexpected initial change event");
- }
- };
- }
-
- @Test
- public void testInitialChangeListenerEventWithContainerPath() throws Exception {
-
- new ShardTestKit(getSystem()) {
- {
- final TestActorRef<Shard> actor = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testInitialChangeListenerEventWithContainerPath");
-
- waitUntilLeader(actor);
-
- final Shard shard = actor.underlyingActor();
- writeToStore(shard.getDataStore(), TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME));
- final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, TEST_PATH),
- "testInitialChangeListenerEventWithContainerPath-DataChangeListener");
- final DataChangeListenerSupport support = new DataChangeListenerSupport(shard);
- support.onMessage(new RegisterChangeListener(TEST_PATH, dclActor, DataChangeScope.ONE, false),
- true,true);
-
- listener.waitForChangeEvents(TEST_PATH);
- }
- };
- }
-
- @Test
- public void testInitialChangeListenerEventWithListPath() throws Exception {
- new ShardTestKit(getSystem()) {
- {
- final TestActorRef<Shard> actor = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testInitialChangeListenerEventWithListPath");
-
- waitUntilLeader(actor);
-
- final Shard shard = actor.underlyingActor();
- mergeToStore(shard.getDataStore(), TEST_PATH, testNodeWithOuter(1, 2));
-
- final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, OUTER_LIST_PATH),
- "testInitialChangeListenerEventWithListPath-DataChangeListener");
- final DataChangeListenerSupport support = new DataChangeListenerSupport(shard);
- support.onMessage(new RegisterChangeListener(OUTER_LIST_PATH, dclActor, DataChangeScope.ONE, false),
- true, true);
-
- listener.waitForChangeEvents();
- assertEquals("Outer entry 1 present", true, NormalizedNodes
- .findNode(listener.getCreatedData(0, OUTER_LIST_PATH), outerEntryKey(1)).isPresent());
- assertEquals("Outer entry 2 present", true, NormalizedNodes
- .findNode(listener.getCreatedData(0, OUTER_LIST_PATH), outerEntryKey(2)).isPresent());
- }
- };
- }
-
- @Test
- public void testInitialChangeListenerEventWithWildcardedListPath() throws Exception {
-
- new ShardTestKit(getSystem()) {
- {
- final TestActorRef<Shard> actor = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testInitialChangeListenerEventWithWildcardedListPath");
-
- waitUntilLeader(actor);
-
- final Shard shard = actor.underlyingActor();
-
- mergeToStore(shard.getDataStore(), TEST_PATH, testNodeWithOuter(1, 2));
- writeToStore(shard.getDataStore(), OUTER_CONTAINER_PATH,
- ImmutableNodes.containerNode(OUTER_CONTAINER_QNAME));
-
- final MockDataChangeListener listener = new MockDataChangeListener(1);
- final YangInstanceIdentifier path = OUTER_LIST_PATH.node(OUTER_LIST_QNAME);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
- "testInitialChangeListenerEventWithWildcardedListPath-DataChangeListener");
- final DataChangeListenerSupport support = new DataChangeListenerSupport(shard);
- support.onMessage(new RegisterChangeListener(path, dclActor, DataChangeScope.ONE, false), true, true);
-
- listener.waitForChangeEvents();
- listener.verifyCreatedData(0, outerEntryPath(1));
- listener.verifyCreatedData(0, outerEntryPath(2));
- listener.verifyNoCreatedData(0, OUTER_CONTAINER_PATH);
- }
- };
- }
-
- @Test
- public void testInitialChangeListenerEventWithNestedWildcardedListsPath() throws Exception {
-
- new ShardTestKit(getSystem()) {
- {
- final TestActorRef<Shard> actor = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testInitialChangeListenerEventWithNestedWildcardedListsPath");
-
- waitUntilLeader(actor);
-
- final Shard shard = actor.underlyingActor();
-
- mergeToStore(shard.getDataStore(), TEST_PATH,
- testNodeWithOuter(outerNode(outerNodeEntry(1, innerNode("one", "two")),
- outerNodeEntry(2, innerNode("three", "four")))));
-
- final MockDataChangeListener listener = new MockDataChangeListener(1);
- final YangInstanceIdentifier path = OUTER_LIST_PATH.node(OUTER_LIST_QNAME)
- .node(INNER_LIST_QNAME).node(INNER_LIST_QNAME);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
- "testInitialChangeListenerEventWithNestedWildcardedListsPath-DataChangeListener");
- final DataChangeListenerSupport support = new DataChangeListenerSupport(shard);
- support.onMessage(new RegisterChangeListener(path, dclActor, DataChangeScope.ONE, false), true, true);
-
-
- listener.waitForChangeEvents();
- listener.verifyCreatedData(0, innerEntryPath(1, "one"));
- listener.verifyCreatedData(0, innerEntryPath(1, "two"));
- listener.verifyCreatedData(0, innerEntryPath(2, "three"));
- listener.verifyCreatedData(0, innerEntryPath(2, "four"));
-
- // Register for a specific outer list entry
- final MockDataChangeListener listener2 = new MockDataChangeListener(1);
- final YangInstanceIdentifier path2 = OUTER_LIST_PATH.node(outerEntryKey(1)).node(INNER_LIST_QNAME)
- .node(INNER_LIST_QNAME);
- final ActorRef dclActor2 = actorFactory.createActor(DataChangeListener.props(listener2, path2),
- "testInitialChangeListenerEventWithNestedWildcardedListsPath-DataChangeListener2");
- final DataChangeListenerSupport support2 = new DataChangeListenerSupport(shard);
- support2.onMessage(new RegisterChangeListener(path2, dclActor2, DataChangeScope.ONE, false),
- true, true);
-
- listener2.waitForChangeEvents();
- listener2.verifyCreatedData(0, innerEntryPath(1, "one"));
- listener2.verifyCreatedData(0, innerEntryPath(1, "two"));
- listener2.verifyNoCreatedData(0, innerEntryPath(2, "three"));
- listener2.verifyNoCreatedData(0, innerEntryPath(2, "four"));
- }
- };
- }
-
- @Test
- public void testInitialChangeListenerEventWhenNotInitiallyLeader() throws Exception {
-
- new ShardTestKit(getSystem()) {
- {
- final TestActorRef<Shard> actor = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testInitialChangeListenerEventWhenNotInitiallyLeader");
-
- waitUntilLeader(actor);
-
- final Shard shard = actor.underlyingActor();
-
- mergeToStore(shard.getDataStore(), TEST_PATH,
- testNodeWithOuter(outerNode(outerNodeEntry(1, innerNode("one", "two")),
- outerNodeEntry(2, innerNode("three", "four")))));
-
- final MockDataChangeListener listener = new MockDataChangeListener(0);
- final YangInstanceIdentifier path = OUTER_LIST_PATH.node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME)
- .node(INNER_LIST_QNAME);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
- "testInitialChangeListenerEventWhenNotInitiallyLeader-DataChangeListener");
- final DataChangeListenerSupport support = new DataChangeListenerSupport(shard);
- support.onMessage(new RegisterChangeListener(path, dclActor, DataChangeScope.ONE, false), false, true);
-
- listener.expectNoMoreChanges("Unexpected initial change event");
- listener.reset(1);
-
- support.onLeadershipChange(true, true);
-
- listener.waitForChangeEvents();
- listener.verifyCreatedData(0, innerEntryPath(1, "one"));
- listener.verifyCreatedData(0, innerEntryPath(1, "two"));
- listener.verifyCreatedData(0, innerEntryPath(2, "three"));
- listener.verifyCreatedData(0, innerEntryPath(2, "four"));
- }
- };
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH;
-
-import akka.actor.ActorRef;
-import akka.actor.DeadLetter;
-import akka.actor.Props;
-import akka.testkit.javadsl.TestKit;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
-import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
-import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-
-public class DataChangeListenerTest extends AbstractActorTest {
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testDataChangedWhenNotificationsAreEnabled() {
- new TestKit(getSystem()) {
- {
- final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class);
- final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
- final Props props = DataChangeListener.props(mockListener, TEST_PATH);
- final ActorRef subject = getSystem().actorOf(props, "testDataChangedNotificationsEnabled");
-
- // Let the DataChangeListener know that notifications should be
- // enabled
- subject.tell(new EnableNotification(true, "test"), getRef());
-
- subject.tell(new DataChanged(mockChangeEvent), getRef());
-
- expectMsgClass(DataChangedReply.class);
-
- Mockito.verify(mockListener).onDataChanged(mockChangeEvent);
- }
- };
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testDataChangedWhenNotificationsAreDisabled() {
- new TestKit(getSystem()) {
- {
- final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class);
- final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
- final Props props = DataChangeListener.props(mockListener, TEST_PATH);
- final ActorRef subject = getSystem().actorOf(props, "testDataChangedNotificationsDisabled");
-
- subject.tell(new DataChanged(mockChangeEvent), getRef());
-
- within(duration("1 seconds"), () -> {
- expectNoMsg();
- Mockito.verify(mockListener, Mockito.never())
- .onDataChanged(Mockito.any(AsyncDataChangeEvent.class));
- return null;
- });
- }
- };
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testDataChangedWithNoSender() {
- new TestKit(getSystem()) {
- {
- final AsyncDataChangeEvent mockChangeEvent = Mockito.mock(AsyncDataChangeEvent.class);
- final AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
- final Props props = DataChangeListener.props(mockListener, TEST_PATH);
- final ActorRef subject = getSystem().actorOf(props, "testDataChangedWithNoSender");
-
- getSystem().eventStream().subscribe(getRef(), DeadLetter.class);
-
- subject.tell(new DataChanged(mockChangeEvent), ActorRef.noSender());
-
- // Make sure no DataChangedReply is sent to DeadLetters.
- while (true) {
- DeadLetter deadLetter;
- try {
- deadLetter = expectMsgClass(duration("1 seconds"), DeadLetter.class);
- } catch (AssertionError e) {
- // Timed out - got no DeadLetter - this is good
- break;
- }
-
- // We may get DeadLetters for other messages we don't care
- // about.
- Assert.assertFalse("Unexpected DataChangedReply", deadLetter.message() instanceof DataChangedReply);
- }
- }
- };
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Test
- public void testDataChangedWithListenerRuntimeEx() {
- new TestKit(getSystem()) {
- {
- final AsyncDataChangeEvent mockChangeEvent1 = Mockito.mock(AsyncDataChangeEvent.class);
- final AsyncDataChangeEvent mockChangeEvent2 = Mockito.mock(AsyncDataChangeEvent.class);
- final AsyncDataChangeEvent mockChangeEvent3 = Mockito.mock(AsyncDataChangeEvent.class);
-
- AsyncDataChangeListener mockListener = Mockito.mock(AsyncDataChangeListener.class);
- Mockito.doThrow(new RuntimeException("mock")).when(mockListener).onDataChanged(mockChangeEvent2);
-
- Props props = DataChangeListener.props(mockListener, TEST_PATH);
- ActorRef subject = getSystem().actorOf(props, "testDataChangedWithListenerRuntimeEx");
-
- // Let the DataChangeListener know that notifications should be
- // enabled
- subject.tell(new EnableNotification(true, "test"), getRef());
-
- subject.tell(new DataChanged(mockChangeEvent1), getRef());
- expectMsgClass(DataChangedReply.class);
-
- subject.tell(new DataChanged(mockChangeEvent2), getRef());
- expectMsgClass(DataChangedReply.class);
-
- subject.tell(new DataChanged(mockChangeEvent3), getRef());
- expectMsgClass(DataChangedReply.class);
-
- Mockito.verify(mockListener).onDataChanged(mockChangeEvent1);
- Mockito.verify(mockListener).onDataChanged(mockChangeEvent2);
- Mockito.verify(mockListener).onDataChanged(mockChangeEvent3);
- }
- };
- }
-}
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
-import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.common.api.ReadFailedException;
import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
};
}
- @Test
- public void testChangeListenerRegistration() throws Exception {
- new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
- {
- try (AbstractDataStore dataStore = setupAbstractDataStore(
- testParameter, "testChangeListenerRegistration", "test-1")) {
-
- testWriteTransaction(dataStore, TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
- final MockDataChangeListener listener = new MockDataChangeListener(1);
-
- final ListenerRegistration<MockDataChangeListener> listenerReg = dataStore
- .registerChangeListener(TestModel.TEST_PATH, listener, DataChangeScope.SUBTREE);
-
- assertNotNull("registerChangeListener returned null", listenerReg);
-
- IntegrationTestKit.verifyShardState(dataStore, "test-1",
- state -> assertEquals("getDataChangeListenerActors", 1,
- state.getDataChangeListenerActors().size()));
-
- // Wait for the initial notification
- listener.waitForChangeEvents(TestModel.TEST_PATH);
- listener.reset(2);
-
- // Write 2 updates.
- testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
-
- YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
- .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
- testWriteTransaction(dataStore, listPath,
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
-
- // Wait for the 2 updates.
- listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
- listenerReg.close();
-
- IntegrationTestKit.verifyShardState(dataStore, "test-1",
- state -> assertEquals("getDataChangeListenerActors", 0,
- state.getDataChangeListenerActors().size()));
-
- testWriteTransaction(dataStore,
- YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
- .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
-
- listener.expectNoMoreChanges("Received unexpected change after close");
- }
- }
- };
- }
-
@Test
public void testDataTreeChangeListenerRegistration() throws Exception {
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
-import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in "
+ "InMemorySnapshotStore and journal recovery seq number will start from 1";
- @Test
- public void testRegisterChangeListener() throws Exception {
- new ShardTestKit(getSystem()) {
- {
- final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testRegisterChangeListener");
-
- waitUntilLeader(shard);
-
- shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
-
- final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener,
- TestModel.TEST_PATH), "testRegisterChangeListener-DataChangeListener");
-
- shard.tell(new RegisterChangeListener(TestModel.TEST_PATH, dclActor,
- AsyncDataBroker.DataChangeScope.BASE, true), getRef());
-
- final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("3 seconds"),
- RegisterDataTreeNotificationListenerReply.class);
- final String replyPath = reply.getListenerRegistrationPath().toString();
- assertTrue("Incorrect reply path: " + replyPath,
- replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
-
- final YangInstanceIdentifier path = TestModel.TEST_PATH;
- writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
- listener.waitForChangeEvents(path);
- }
- };
- }
-
- @SuppressWarnings("serial")
- @Test
- public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
- // This test tests the timing window in which a change listener is registered before the
- // shard becomes the leader. We verify that the listener is registered and notified of the
- // existing data when the shard becomes the leader.
- // For this test, we want to send the RegisterChangeListener message after the shard
- // has recovered from persistence and before it becomes the leader. So we subclass
- // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
- // we know that the shard has been initialized to a follower and has started the
- // election process. The following 2 CountDownLatches are used to coordinate the
- // ElectionTimeout with the sending of the RegisterChangeListener message.
- final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
- final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
- final Creator<Shard> creator = new Creator<Shard>() {
- boolean firstElectionTimeout = true;
-
- @Override
- public Shard create() throws Exception {
- // Use a non persistent provider because this test actually invokes persist on the journal
- // this will cause all other messages to not be queued properly after that.
- // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
- // it does do a persist)
- return new Shard(newShardBuilder()) {
- @Override
- public void handleCommand(final Object message) {
- if (message instanceof ElectionTimeout && firstElectionTimeout) {
- // Got the first ElectionTimeout. We don't forward it to the
- // base Shard yet until we've sent the RegisterChangeListener
- // message. So we signal the onFirstElectionTimeout latch to tell
- // the main thread to send the RegisterChangeListener message and
- // start a thread to wait on the onChangeListenerRegistered latch,
- // which the main thread signals after it has sent the message.
- // After the onChangeListenerRegistered is triggered, we send the
- // original ElectionTimeout message to proceed with the election.
- firstElectionTimeout = false;
- final ActorRef self = getSelf();
- new Thread(() -> {
- Uninterruptibles.awaitUninterruptibly(
- onChangeListenerRegistered, 5, TimeUnit.SECONDS);
- self.tell(message, self);
- }).start();
-
- onFirstElectionTimeout.countDown();
- } else {
- super.handleCommand(message);
- }
- }
- };
- }
- };
-
- setupInMemorySnapshotStore();
-
- final YangInstanceIdentifier path = TestModel.TEST_PATH;
- final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
- "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
-
- final TestActorRef<Shard> shard = actorFactory.createTestActor(
- Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testRegisterChangeListenerWhenNotLeaderInitially");
-
- new ShardTestKit(getSystem()) {
- {
- // Wait until the shard receives the first ElectionTimeout
- // message.
- assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
-
- // Now send the RegisterChangeListener and wait for the reply.
- shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.SUBTREE, false),
- getRef());
-
- final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterDataTreeNotificationListenerReply.class);
- assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
-
- // Sanity check - verify the shard is not the leader yet.
- shard.tell(FindLeader.INSTANCE, getRef());
- final FindLeaderReply findLeadeReply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
- assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
-
- // Signal the onChangeListenerRegistered latch to tell the
- // thread above to proceed
- // with the election process.
- onChangeListenerRegistered.countDown();
-
- // Wait for the shard to become the leader and notify our
- // listener with the existing
- // data in the store.
- listener.waitForChangeEvents(path);
- }
- };
- }
-
@Test
public void testRegisterDataTreeChangeListener() throws Exception {
new ShardTestKit(getSystem()) {
assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
}
- @Test
- public void testClusteredDataChangeListenerWithDelayedRegistration() throws Exception {
- new ShardTestKit(getSystem()) {
- {
- final String testName = "testClusteredDataChangeListenerWithDelayedRegistration";
- dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
- .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
-
- final YangInstanceIdentifier path = TestModel.TEST_PATH;
- final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
- actorFactory.generateActorId(testName + "-DataChangeListener"));
-
- setupInMemorySnapshotStore();
-
- final TestActorRef<Shard> shard = actorFactory.createTestActor(
- newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
- actorFactory.generateActorId(testName + "-shard"));
-
- waitUntilNoLeader(shard);
-
- shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true),
- getRef());
- final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterDataTreeNotificationListenerReply.class);
- assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
-
- shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
- .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
-
- listener.waitForChangeEvents();
- }
- };
- }
-
- @Test
- public void testClusteredDataChangeListenerRegistration() throws Exception {
- new ShardTestKit(getSystem()) {
- {
- final String testName = "testClusteredDataChangeListenerRegistration";
- final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
- MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
-
- final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
- MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
-
- final TestActorRef<Shard> followerShard = actorFactory
- .createTestActor(Shard.builder().id(followerShardID)
- .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
- .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
- "akka://test/user/" + leaderShardID.toString()))
- .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
- .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
-
- final TestActorRef<Shard> leaderShard = actorFactory
- .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
- .peerAddresses(Collections.singletonMap(followerShardID.toString(),
- "akka://test/user/" + followerShardID.toString()))
- .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
- .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
-
- leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
- final String leaderPath = waitUntilLeader(followerShard);
- assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
-
- final YangInstanceIdentifier path = TestModel.TEST_PATH;
- final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
- actorFactory.generateActorId(testName + "-DataChangeListener"));
-
- followerShard.tell(
- new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true),
- getRef());
- final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterDataTreeNotificationListenerReply.class);
- assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
-
- writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
- listener.waitForChangeEvents();
- }
- };
- }
-
@Test
public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception {
new ShardTestKit(getSystem()) {
+++ /dev/null
-/*
- * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.messages;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.commons.lang.SerializationUtils;
-import org.junit.Test;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
-
-/**
- * Unit tests for DataChanged.
- *
- * @author Thomas Pantelis
- */
-public class DataChangedTest {
-
- @Test
- public void testSerialization() {
- DOMImmutableDataChangeEvent change = DOMImmutableDataChangeEvent.builder(DataChangeScope.SUBTREE)
- .addCreated(TestModel.TEST_PATH,
- ImmutableContainerNodeBuilder.create()
- .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
- .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build())
- .addUpdated(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
- ImmutableContainerNodeBuilder.create()
- .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
- .withChild(ImmutableNodes.leafNode(TestModel.NAME_QNAME, "bar")).build())
- .addRemoved(TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build())
- .setBefore(ImmutableNodes.containerNode(TestModel.TEST_QNAME))
- .setAfter(ImmutableContainerNodeBuilder.create()
- .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
- .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo"))
- .withChild(ImmutableNodes.leafNode(TestModel.NAME_QNAME, "bar")).build())
- .build();
-
- DataChanged expected = new DataChanged(change);
-
- DataChanged actual = (DataChanged) SerializationUtils.clone(expected);
-
- assertEquals("getCreatedData", change.getCreatedData(), actual.getChange().getCreatedData());
- assertEquals("getOriginalData", change.getOriginalData(), actual.getChange().getOriginalData());
- assertEquals("getOriginalSubtree", change.getOriginalSubtree(), actual.getChange().getOriginalSubtree());
- assertEquals("getRemovedPaths", change.getRemovedPaths(), actual.getChange().getRemovedPaths());
- assertEquals("getUpdatedData", change.getUpdatedData(), actual.getChange().getUpdatedData());
- assertEquals("getUpdatedSubtree", change.getUpdatedSubtree(), actual.getChange().getUpdatedSubtree());
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.utils;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Uninterruptibles;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-/**
- * A mock DataChangeListener implementation.
- *
- * @author Thomas Pantelis
- */
-public class MockDataChangeListener implements
- AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> {
-
- private final List<AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>> changeList =
- Collections.synchronizedList(Lists.<AsyncDataChangeEvent<YangInstanceIdentifier,
- NormalizedNode<?, ?>>>newArrayList());
-
- private volatile CountDownLatch changeLatch;
- private int expChangeEventCount;
-
- public MockDataChangeListener(int expChangeEventCount) {
- reset(expChangeEventCount);
- }
-
- public void reset(int newExpChangeEventCount) {
- changeLatch = new CountDownLatch(newExpChangeEventCount);
- this.expChangeEventCount = newExpChangeEventCount;
- changeList.clear();
- }
-
- @Override
- public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
- changeList.add(change);
- changeLatch.countDown();
- }
-
- public void waitForChangeEvents(YangInstanceIdentifier... expPaths) {
- boolean done = Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS);
- if (!done) {
- fail(String.format("Missing change notifications. Expected: %d. Actual: %d",
- expChangeEventCount, expChangeEventCount - changeLatch.getCount()));
- }
-
- for (int i = 0; i < expPaths.length; i++) {
- Map<YangInstanceIdentifier, NormalizedNode<?, ?>> createdData = changeList.get(i).getCreatedData();
- assertTrue(String.format("Change %d does not contain %s. Actual: %s", i + 1, expPaths[i], createdData),
- createdData.containsKey(expPaths[i]));
- }
- }
-
- public NormalizedNode<?, ?> getCreatedData(int num, YangInstanceIdentifier path) {
- return changeList.get(num).getCreatedData().get(path);
- }
-
- public void verifyCreatedData(int num, YangInstanceIdentifier path) {
- Map<YangInstanceIdentifier, NormalizedNode<?, ?>> createdData = changeList.get(num).getCreatedData();
- assertTrue(path + " not present in " + createdData.keySet(), createdData.get(path) != null);
- }
-
- public void expectNoMoreChanges(String assertMsg) {
- Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
- assertEquals(assertMsg, expChangeEventCount, changeList.size());
- }
-
- public void verifyNoCreatedData(int num, YangInstanceIdentifier path) {
- Map<YangInstanceIdentifier, NormalizedNode<?, ?>> createdData = changeList.get(num).getCreatedData();
- assertTrue("Unexpected " + path + " present in createdData", createdData.get(path) == null);
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.md.sal.dom.api;
-
-/**
- * ClusteredDOMDataChangeListener is a marker interface to enable data change
- * notifications on all instances in a cluster, where this listener is
- * registered.
- *
- * <p>
- * Applications should implement ClusteredDOMDataChangeListener instead of DOMDataChangeListener, if they want to
- * listen to data change notifications on any node of clustered datastore. DOMDataChangeListener enables data change
- * notifications only at leader of the datastore shard.
- */
-public interface ClusteredDOMDataChangeListener extends DOMDataChangeListener{
-}
*
*/
public interface DOMDataBroker extends
- AsyncDataBroker<YangInstanceIdentifier, NormalizedNode<?, ?>, DOMDataChangeListener>,
+ AsyncDataBroker<YangInstanceIdentifier, NormalizedNode<?, ?>>,
TransactionChainFactory<YangInstanceIdentifier, NormalizedNode<?, ?>>, BrokerService,
DOMExtensibleService<DOMDataBroker, DOMDataBrokerExtension> {
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.dom.api;
-
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-/**
- * DOMDataChangeListener enables data change notifications only at leader of the datastore shard.
- */
-@Deprecated
-public interface DOMDataChangeListener extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> {
-
-}
/**
* Interface implemented by classes interested in receiving notifications about
- * data tree changes. This interface differs from {@link DOMDataChangeListener}
- * in that it provides a cursor-based view of the change, which has potentially
+ * data tree changes. This interface provides a cursor-based view of the change, which has potentially
* lower overhead.
*
* <p>
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
return "DOM-" + txNum.getAndIncrement();
}
- @Override
- public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(final LogicalDatastoreType store,
- final YangInstanceIdentifier path, final DOMDataChangeListener listener,
- final DataChangeScope triggeringScope) {
- DOMStore potentialStore = getTxFactories().get(store);
- checkState(potentialStore != null, "Requested logical data store is not available.");
- return potentialStore.registerChangeListener(path, listener, triggeringScope);
- }
-
@Nonnull
@Override
public Map<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension> getSupportedExtensions() {
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.opendaylight.mdsal.dom.broker.ShardedDOMDataBrokerAdapter;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
/**
* DOMDataBroker implementation that forwards calls to
delegateDataBroker.newWriteOnlyTransaction());
}
- @Override
- public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(final LogicalDatastoreType store,
- final YangInstanceIdentifier path,
- final DOMDataChangeListener listener,
- final DataChangeScope
- triggeringScope) {
- throw new UnsupportedOperationException(
- "Registering data change listeners is not supported in " + "md-sal forwarding data broker");
-
- }
-
@Override
public DOMTransactionChain createTransactionChain(final TransactionChainListener listener) {
return new ShardedDOMDataBrokerDelegatingTransactionChain(chainNum.getAndIncrement(),
package org.opendaylight.controller.sal.dom.broker.osgi;
import java.util.Map;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.osgi.framework.ServiceReference;
public class DOMDataBrokerProxy extends AbstractBrokerServiceProxy<DOMDataBroker> implements DOMDataBroker {
return getDelegate().newWriteOnlyTransaction();
}
- @Override
- public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(final LogicalDatastoreType store,
- final YangInstanceIdentifier path, final DOMDataChangeListener listener,
- final org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope triggeringScope) {
- return getDelegate().registerDataChangeListener(store, path, listener, triggeringScope);
- }
-
@Override
public DOMTransactionChain createTransactionChain(final TransactionChainListener listener) {
return getDelegate().createTransactionChain(listener);
package org.opendaylight.controller.sal.core.compat;
import com.google.common.collect.ForwardingObject;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
public abstract class DOMStoreAdapter<T extends org.opendaylight.mdsal.dom.spi.store.DOMStore
& org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher> extends ForwardingObject
return new DOMStoreTransactionChainAdapter(delegate().createTransactionChain());
}
- @Override
- public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L>
- registerChangeListener(final YangInstanceIdentifier path, final L listener, final DataChangeScope scope) {
- throw new UnsupportedOperationException();
- }
-
@Override
public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
final YangInstanceIdentifier treeId, final L listener) {
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.core.compat;
-
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-public interface DataChangeListenerRegistration<L extends AsyncDataChangeListener<YangInstanceIdentifier,
- NormalizedNode<?, ?>>> extends ListenerRegistration<L> {
- @Override
- L getInstance();
-
- YangInstanceIdentifier getPath();
-
- DataChangeScope getScope();
-}
+++ /dev/null
-/**
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.core.compat;
-
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-abstract class DataChangeListenerRegistrationImpl<T extends AsyncDataChangeListener<YangInstanceIdentifier,
- NormalizedNode<?, ?>>> extends AbstractListenerRegistration<T> implements DataChangeListenerRegistration<T> {
- DataChangeListenerRegistrationImpl(final T listener) {
- super(listener);
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.core.compat;
-
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.mdsal.dom.spi.AbstractRegistrationTree;
-import org.opendaylight.mdsal.dom.spi.RegistrationTreeNode;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-/**
- * A set of listeners organized as a tree by node to which they listen. This class
- * allows for efficient lookup of listeners when we walk the DataTreeCandidate.
- *
- * @author Robert Varga
- */
-public final class ListenerTree extends AbstractRegistrationTree<DataChangeListenerRegistration<?>> {
- private ListenerTree() {
- // Private to disallow direct instantiation
- }
-
- /**
- * Create a new empty instance of the listener tree.
- *
- * @return An empty instance.
- */
- public static ListenerTree create() {
- return new ListenerTree();
- }
-
- /**
- * Registers listener on this node.
- *
- * @param path Full path on which listener is registered.
- * @param listener Listener
- * @param scope Scope of triggering event.
- * @return Listener registration
- */
- public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
- DataChangeListenerRegistration<L> registerDataChangeListener(final YangInstanceIdentifier path,
- final L listener, final DataChangeScope scope) {
-
- // Take the write lock
- takeLock();
- try {
- final RegistrationTreeNode<DataChangeListenerRegistration<?>> node = findNodeFor(path.getPathArguments());
- DataChangeListenerRegistration<L> reg = new DataChangeListenerRegistrationImpl<L>(listener) {
- @Override
- public DataChangeScope getScope() {
- return scope;
- }
-
- @Override
- public YangInstanceIdentifier getPath() {
- return path;
- }
-
- @Override
- protected void removeRegistration() {
- /*
- * TODO: Here's an interesting problem. The way the datastore works, it
- * enqueues requests towards the listener, so the listener will be
- * notified at some point in the future. Now if the registration is
- * closed, we will prevent any new events from being delivered, but
- * we have no way to purge that queue.
- *
- * While this does not directly violate the ListenerRegistration
- * contract, it is probably not going to be liked by the users.
- */
- ListenerTree.this.removeRegistration(node, this);
- }
- };
-
- addRegistration(node, reg);
- return reg;
- } finally {
- // Always release the lock
- releaseLock();
- }
- }
-}
import com.google.common.collect.ForwardingObject;
import java.util.Map;
import javax.annotation.Nonnull;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
/**
* Utility {@link DOMDataBroker} implementation which forwards all interface
@Override
protected abstract @Nonnull DOMDataBroker delegate();
- @Override
- public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(final LogicalDatastoreType store,
- final YangInstanceIdentifier path, final DOMDataChangeListener listener,
- final DataChangeScope triggeringScope) {
- return delegate().registerDataChangeListener(store, path, listener, triggeringScope);
- }
-
@Override
public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
return delegate().newReadOnlyTransaction();
*/
package org.opendaylight.controller.sal.core.spi.data;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
/**
*
*/
public interface DOMStore extends DOMStoreTransactionFactory {
-
- /**
- * Registers {@link org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener} for Data Change
- * callbacks which will be triggered on the change of provided subpath. What constitutes a change
- * depends on the @scope parameter.
- *
- * Listener upon registration receives an initial callback
- * {@link AsyncDataChangeListener#onDataChanged(org.opendaylight.controller.md.sal.common.api.data
- * .AsyncDataChangeEvent)}
- * which contains stable view of data tree at the time of registration.
- *
- * Â @param path Path (subtree identifier) on which client listener will be
- * invoked.
- *
- * @param listener
- * Instance of listener which should be invoked on
- * @param scope
- * Scope of change which triggers callback.
- * @return Listener Registration object, which client may use to close
- * registration / interest on receiving data changes.
- *
- */
- <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L>
- registerChangeListener(YangInstanceIdentifier path, L listener, DataChangeScope scope);
-
/**
* Creates new transaction chain.
*
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.dom.store.impl;
-
-import com.google.common.base.Preconditions;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-public final class DOMImmutableDataChangeEvent implements
- AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> {
-
- private static final RemoveEventFactory REMOVE_EVENT_FACTORY = new RemoveEventFactory();
- private static final CreateEventFactory CREATE_EVENT_FACTORY = new CreateEventFactory();
-
- private final NormalizedNode<?, ?> original;
- private final NormalizedNode<?, ?> updated;
- private final Map<YangInstanceIdentifier, NormalizedNode<?, ?>> originalData;
- private final Map<YangInstanceIdentifier, NormalizedNode<?, ?>> createdData;
- private final Map<YangInstanceIdentifier, NormalizedNode<?, ?>> updatedData;
- private final Set<YangInstanceIdentifier> removedPaths;
- private final DataChangeScope scope;
-
- private DOMImmutableDataChangeEvent(final Builder change) {
- original = change.before;
- updated = change.after;
- originalData = Collections.unmodifiableMap(change.original);
- createdData = Collections.unmodifiableMap(change.created);
- updatedData = Collections.unmodifiableMap(change.updated);
- removedPaths = Collections.unmodifiableSet(change.removed);
- scope = change.scope;
- }
-
- public static Builder builder(final DataChangeScope scope) {
- return new Builder(scope);
- }
-
- protected DataChangeScope getScope() {
- return scope;
- }
-
- @Override
- public NormalizedNode<?, ?> getOriginalSubtree() {
- return original;
- }
-
- @Override
- public NormalizedNode<?, ?> getUpdatedSubtree() {
- return updated;
- }
-
- @Override
- public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getOriginalData() {
- return originalData;
- }
-
- @Override
- public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getCreatedData() {
- return createdData;
- }
-
- @Override
- public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getUpdatedData() {
- return updatedData;
- }
-
- @Override
- public Set<YangInstanceIdentifier> getRemovedPaths() {
- return removedPaths;
- }
-
- @Override
- public String toString() {
- return "DOMImmutableDataChangeEvent [created=" + createdData.keySet() + ", updated=" + updatedData.keySet()
- + ", removed=" + removedPaths + "]";
- }
-
- /**
- * Simple event factory which creates event based on path and data.
- */
- public interface SimpleEventFactory {
- DOMImmutableDataChangeEvent create(YangInstanceIdentifier path, NormalizedNode<PathArgument,?> data);
- }
-
- /**
- * Event factory which takes after state and creates event for it.
- *
- * <p>
- * Factory for events based on path and after state.
- * After state is set as {@link #getUpdatedSubtree()} and is path,
- * state mapping is also present in {@link #getUpdatedData()}.
- */
- public static SimpleEventFactory getCreateEventFactory() {
- return CREATE_EVENT_FACTORY;
- }
-
- /**
- * Event factory which takes before state and creates event for it.
- *
- * <p>
- * Factory for events based on path and after state.
- * After state is set as {@link #getOriginalSubtree()} and is path,
- * state mapping is also present in {@link #getOriginalSubtree()}.
- *
- * <p>
- * Path is present in {@link #getRemovedPaths()}.
- */
- public static SimpleEventFactory getRemoveEventFactory() {
- return REMOVE_EVENT_FACTORY;
- }
-
- public static final class Builder {
-
- public DataChangeScope scope;
- private NormalizedNode<?, ?> after;
- private NormalizedNode<?, ?> before;
-
- private final Map<YangInstanceIdentifier, NormalizedNode<?, ?>> original = new HashMap<>();
- private final Map<YangInstanceIdentifier, NormalizedNode<?, ?>> created = new HashMap<>();
- private final Map<YangInstanceIdentifier, NormalizedNode<?, ?>> updated = new HashMap<>();
- private final Set<YangInstanceIdentifier> removed = new HashSet<>();
-
- private Builder(final DataChangeScope scope) {
- this.scope = Preconditions.checkNotNull(scope, "Data change scope should not be null.");
- }
-
- public Builder setAfter(final NormalizedNode<?, ?> node) {
- after = node;
- return this;
- }
-
- public DOMImmutableDataChangeEvent build() {
-
- return new DOMImmutableDataChangeEvent(this);
- }
-
- public void merge(final DOMImmutableDataChangeEvent nestedChanges) {
-
- original.putAll(nestedChanges.getOriginalData());
- created.putAll(nestedChanges.getCreatedData());
- updated.putAll(nestedChanges.getUpdatedData());
- removed.addAll(nestedChanges.getRemovedPaths());
-
- }
-
- public Builder setBefore(final NormalizedNode<?, ?> node) {
- this.before = node;
- return this;
- }
-
- public Builder addCreated(final YangInstanceIdentifier path, final NormalizedNode<?, ?> node) {
- created.put(path, node);
- return this;
- }
-
- public Builder addRemoved(final YangInstanceIdentifier path, final NormalizedNode<?, ?> node) {
- original.put(path, node);
- removed.add(path);
- return this;
- }
-
- public Builder addUpdated(final YangInstanceIdentifier path, final NormalizedNode<?, ?> nodeBefore,
- final NormalizedNode<?, ?> nodeAfter) {
- original.put(path, nodeBefore);
- updated.put(path, nodeAfter);
- return this;
- }
-
- public boolean isEmpty() {
- return created.isEmpty() && removed.isEmpty() && updated.isEmpty();
- }
- }
-
- private static final class RemoveEventFactory implements SimpleEventFactory {
-
- @Override
- public DOMImmutableDataChangeEvent create(final YangInstanceIdentifier path,
- final NormalizedNode<PathArgument, ?> data) {
- return builder(DataChangeScope.BASE) //
- .setBefore(data) //
- .addRemoved(path, data) //
- .build();
- }
-
- }
-
- private static final class CreateEventFactory implements SimpleEventFactory {
-
- @Override
- public DOMImmutableDataChangeEvent create(final YangInstanceIdentifier path,
- final NormalizedNode<PathArgument, ?> data) {
- return builder(DataChangeScope.BASE) //
- .setAfter(data) //
- .addCreated(path, data) //
- .build();
- }
- }
-}
package org.opendaylight.controller.md.sal.dom.store.impl;
import com.google.common.base.Preconditions;
-import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.controller.sal.core.compat.DataChangeListenerRegistration;
-import org.opendaylight.controller.sal.core.compat.ListenerTree;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedTransactions;
import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
-import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.util.ExecutorServiceUtil;
import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
-import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.Invoker;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
* Implementation of {@link DOMStore} which uses {@link DataTree} and other
* classes such as {@link SnapshotBackedWriteTransaction}.
* {@link org.opendaylight.controller.sal.core.spi.data.SnapshotBackedReadTransaction} and
- * {@link ResolveDataChangeEventsTask}
* to implement {@link DOMStore} contract.
*
*/
implements DOMStore, Identifiable<String>, SchemaContextListener, AutoCloseable, DOMStoreTreeChangePublisher {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
- private static final Invoker<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent>
- DCL_NOTIFICATION_MGR_INVOKER =
- (listener, notification) -> {
- final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> inst =
- listener.getInstance();
- inst.onDataChanged(notification);
- };
-
private final DataTree dataTree;
- private final ListenerTree listenerTree = ListenerTree.create();
private final AtomicLong txCounter = new AtomicLong(0);
- private final QueuedNotificationManager<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent>
- dataChangeListenerNotificationManager;
private final InMemoryDOMStoreTreeChangePublisher changePublisher;
private final ExecutorService dataChangeListenerExecutor;
private final boolean debugTransactions;
this.dataChangeListenerExecutor = Preconditions.checkNotNull(dataChangeListenerExecutor);
this.debugTransactions = debugTransactions;
- dataChangeListenerNotificationManager =
- new QueuedNotificationManager<>(this.dataChangeListenerExecutor,
- DCL_NOTIFICATION_MGR_INVOKER, maxDataChangeListenerQueueSize,
- "DataChangeListenerQueueMgr");
changePublisher = new InMemoryDOMStoreTreeChangePublisher(this.dataChangeListenerExecutor,
maxDataChangeListenerQueueSize);
}
public QueuedNotificationManager<?, ?> getDataChangeListenerNotificationManager() {
- return dataChangeListenerNotificationManager;
+ return changePublisher.getNotificationManager();
}
@Override
return dataTree.takeSnapshot();
}
- @Override
- public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L>
- registerChangeListener(final YangInstanceIdentifier path, final L listener, final DataChangeScope scope) {
-
- /*
- * Make sure commit is not occurring right now. Listener has to be
- * registered and its state capture enqueued at a consistent point.
- *
- * FIXME: improve this to read-write lock, such that multiple listener
- * registrations can occur simultaneously
- */
- final DataChangeListenerRegistration<L> reg;
- synchronized (this) {
- LOG.debug("{}: Registering data change listener {} for {}", name, listener, path);
-
- reg = listenerTree.registerDataChangeListener(path, listener, scope);
-
- Optional<NormalizedNode<?, ?>> currentState = dataTree.takeSnapshot().readNode(path);
- if (currentState.isPresent()) {
- final NormalizedNode<?, ?> data = currentState.get();
-
- final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE) //
- .setAfter(data) //
- .addCreated(path, data) //
- .build();
-
- dataChangeListenerNotificationManager.submitNotification(reg, event);
- }
- }
-
- return new AbstractListenerRegistration<L>(listener) {
- @Override
- protected void removeRegistration() {
- synchronized (InMemoryDOMDataStore.this) {
- reg.close();
- }
- }
- };
- }
-
@Override
public synchronized <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
final YangInstanceIdentifier treeId, final L listener) {
synchronized void commit(final DataTreeCandidate candidate) {
dataTree.commit(candidate);
changePublisher.publishChange(candidate);
- ResolveDataChangeEventsTask.create(candidate, listenerTree).resolve(dataChangeListenerNotificationManager);
}
}
this.notificationManager = notificationManager;
}
+ QueuedNotificationManager<AbstractDOMDataTreeChangeListenerRegistration<?>, DataTreeCandidate>
+ getNotificationManager() {
+ return notificationManager;
+ }
+
@Override
protected void notifyListeners(final Collection<AbstractDOMDataTreeChangeListenerRegistration<?>> registrations,
final YangInstanceIdentifier path, final DataTreeCandidateNode node) {
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.dom.store.impl;
-
-import com.google.common.annotations.Beta;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-import java.util.Collection;
-import java.util.Map.Entry;
-import java.util.Optional;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder;
-import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.SimpleEventFactory;
-import org.opendaylight.controller.sal.core.compat.DataChangeListenerRegistration;
-import org.opendaylight.controller.sal.core.compat.ListenerTree;
-import org.opendaylight.mdsal.dom.spi.RegistrationTreeSnapshot;
-import org.opendaylight.yangtools.util.concurrent.NotificationManager;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodeContainer;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Resolve Data Change Events based on modifications and listeners
- *
- * <p>
- * Computes data change events for all affected registered listeners in data tree.
- */
-@Beta
-public final class ResolveDataChangeEventsTask {
- private static final Logger LOG = LoggerFactory.getLogger(ResolveDataChangeEventsTask.class);
-
- private final DataTreeCandidate candidate;
- private final ListenerTree listenerRoot;
-
- private Multimap<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> collectedEvents;
-
- private ResolveDataChangeEventsTask(final DataTreeCandidate candidate, final ListenerTree listenerTree) {
- this.candidate = Preconditions.checkNotNull(candidate);
- this.listenerRoot = Preconditions.checkNotNull(listenerTree);
- }
-
- /**
- * Resolves and submits notification tasks to the specified manager.
- */
- public synchronized void resolve(final NotificationManager<DataChangeListenerRegistration<?>,
- DOMImmutableDataChangeEvent> manager) {
- try (RegistrationTreeSnapshot<DataChangeListenerRegistration<?>> w = listenerRoot.takeSnapshot()) {
- // Defensive: reset internal state
- collectedEvents = ArrayListMultimap.create();
-
- // Run through the tree
- final ResolveDataChangeState s = ResolveDataChangeState.initial(candidate.getRootPath(), w.getRootNode());
- resolveAnyChangeEvent(s, candidate.getRootNode());
-
- /*
- * Convert to tasks, but be mindful of multiple values -- those indicate multiple
- * wildcard matches, which need to be merged.
- */
- for (Entry<DataChangeListenerRegistration<?>, Collection<DOMImmutableDataChangeEvent>> e :
- collectedEvents.asMap().entrySet()) {
- final Collection<DOMImmutableDataChangeEvent> col = e.getValue();
- final DOMImmutableDataChangeEvent event;
-
- if (col.size() != 1) {
- final Builder b = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE);
- for (DOMImmutableDataChangeEvent i : col) {
- b.merge(i);
- }
-
- event = b.build();
- LOG.trace("Merged events {} into event {}", col, event);
- } else {
- event = col.iterator().next();
- }
-
- manager.submitNotification(e.getKey(), event);
- }
- }
- }
-
- /**
- * Resolves data change event for supplied node.
- *
- * @param path
- * Path to current node in tree
- * @param listeners
- * Collection of Listener registration nodes interested in
- * subtree
- * @param modification
- * Modification of current node
- * @param before
- * - Original (before) state of current node
- * @param after
- * - After state of current node
- * @return True if the subtree changed, false otherwise
- */
- private boolean resolveAnyChangeEvent(final ResolveDataChangeState state, final DataTreeCandidateNode node) {
- final Optional<NormalizedNode<?, ?>> maybeBefore = node.getDataBefore();
- final Optional<NormalizedNode<?, ?>> maybeAfter = node.getDataAfter();
- final ModificationType type = node.getModificationType();
-
- if (type != ModificationType.UNMODIFIED && !maybeAfter.isPresent() && !maybeBefore.isPresent()) {
- LOG.debug("Modification at {} has type {}, but no before- and after-data. Assuming unchanged.",
- state.getPath(), type);
- return false;
- }
-
- // no before and after state is present
-
- switch (type) {
- case SUBTREE_MODIFIED:
- return resolveSubtreeChangeEvent(state, node);
- case APPEARED:
- case WRITE:
- Preconditions.checkArgument(maybeAfter.isPresent(),
- "Modification at {} has type {} but no after-data", state.getPath(), type);
- if (!maybeBefore.isPresent()) {
- @SuppressWarnings({ "unchecked", "rawtypes" })
- final NormalizedNode<PathArgument, ?> afterNode = (NormalizedNode)maybeAfter.get();
- resolveSameEventRecursivelly(state, afterNode, DOMImmutableDataChangeEvent.getCreateEventFactory());
- return true;
- }
-
- return resolveReplacedEvent(state, maybeBefore.get(), maybeAfter.get());
- case DISAPPEARED:
- case DELETE:
- Preconditions.checkArgument(maybeBefore.isPresent(),
- "Modification at {} has type {} but no before-data", state.getPath(), type);
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- final NormalizedNode<PathArgument, ?> beforeNode = (NormalizedNode)maybeBefore.get();
- resolveSameEventRecursivelly(state, beforeNode, DOMImmutableDataChangeEvent.getRemoveEventFactory());
- return true;
- case UNMODIFIED:
- return false;
- default:
- break;
- }
-
- throw new IllegalStateException(String.format("Unhandled node state %s at %s", type, state.getPath()));
- }
-
- private boolean resolveReplacedEvent(final ResolveDataChangeState state,
- final NormalizedNode<?, ?> beforeData, final NormalizedNode<?, ?> afterData) {
-
- if (beforeData instanceof NormalizedNodeContainer<?, ?, ?>) {
- /*
- * Node is a container (contains a child) and we have interested
- * listeners registered for it, that means we need to do
- * resolution of changes on children level and can not
- * shortcut resolution.
- */
- LOG.trace("Resolving subtree replace event for {} before {}, after {}", state.getPath(), beforeData,
- afterData);
- @SuppressWarnings("unchecked")
- NormalizedNodeContainer<?, PathArgument, NormalizedNode<PathArgument, ?>> beforeCont =
- (NormalizedNodeContainer<?, PathArgument, NormalizedNode<PathArgument, ?>>) beforeData;
- @SuppressWarnings("unchecked")
- NormalizedNodeContainer<?, PathArgument, NormalizedNode<PathArgument, ?>> afterCont =
- (NormalizedNodeContainer<?, PathArgument, NormalizedNode<PathArgument, ?>>) afterData;
- return resolveNodeContainerReplaced(state, beforeCont, afterCont);
- }
-
- // Node is a Leaf type (does not contain child nodes)
- // so normal equals method is sufficient for determining change.
- if (beforeData.equals(afterData)) {
- LOG.trace("Skipping equal leaf {}", state.getPath());
- return false;
- }
-
- LOG.trace("Resolving leaf replace event for {} , before {}, after {}", state.getPath(), beforeData, afterData);
- DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE)
- .addUpdated(state.getPath(), beforeData, afterData).build();
- state.addEvent(event);
- state.collectEvents(beforeData, afterData, collectedEvents);
- return true;
- }
-
- private boolean resolveNodeContainerReplaced(final ResolveDataChangeState state,
- final NormalizedNodeContainer<?, PathArgument, NormalizedNode<PathArgument, ?>> beforeCont,
- final NormalizedNodeContainer<?, PathArgument, NormalizedNode<PathArgument, ?>> afterCont) {
- if (!state.needsProcessing()) {
- LOG.trace("Not processing replaced container {}", state.getPath());
- return true;
- }
-
- // We look at all children from before and compare it with after state.
- boolean childChanged = false;
- for (NormalizedNode<PathArgument, ?> beforeChild : beforeCont.getValue()) {
- final PathArgument childId = beforeChild.getIdentifier();
-
- if (resolveNodeContainerChildUpdated(state.child(childId), beforeChild, afterCont.getChild(childId))) {
- childChanged = true;
- }
- }
-
- for (NormalizedNode<PathArgument, ?> afterChild : afterCont.getValue()) {
- final PathArgument childId = afterChild.getIdentifier();
-
- /*
- * We have already iterated of the before-children, so have already
- * emitted modify/delete events. This means the child has been
- * created.
- */
- if (!beforeCont.getChild(childId).isPresent()) {
- resolveSameEventRecursivelly(state.child(childId), afterChild,
- DOMImmutableDataChangeEvent.getCreateEventFactory());
- childChanged = true;
- }
- }
-
- if (childChanged) {
- DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE)
- .addUpdated(state.getPath(), beforeCont, afterCont).build();
- state.addEvent(event);
- }
-
- state.collectEvents(beforeCont, afterCont, collectedEvents);
- return childChanged;
- }
-
- private boolean resolveNodeContainerChildUpdated(final ResolveDataChangeState state,
- final NormalizedNode<PathArgument, ?> before, final Optional<NormalizedNode<PathArgument, ?>> after) {
- if (after.isPresent()) {
- // REPLACE or SUBTREE Modified
- return resolveReplacedEvent(state, before, after.get());
- }
-
- // AFTER state is not present - child was deleted.
- resolveSameEventRecursivelly(state, before, DOMImmutableDataChangeEvent.getRemoveEventFactory());
- return true;
- }
-
- private void resolveSameEventRecursivelly(final ResolveDataChangeState state,
- final NormalizedNode<PathArgument, ?> node, final SimpleEventFactory eventFactory) {
- if (!state.needsProcessing()) {
- LOG.trace("Skipping child {}", state.getPath());
- return;
- }
-
- // We have listeners for this node or it's children, so we will try
- // to do additional processing
- if (node instanceof NormalizedNodeContainer<?, ?, ?>) {
- LOG.trace("Resolving subtree recursive event for {}, type {}", state.getPath(), eventFactory);
-
- // Node has children, so we will try to resolve it's children
- // changes.
- @SuppressWarnings("unchecked")
- NormalizedNodeContainer<?, PathArgument, NormalizedNode<PathArgument, ?>> container =
- (NormalizedNodeContainer<?, PathArgument, NormalizedNode<PathArgument, ?>>) node;
- for (NormalizedNode<PathArgument, ?> child : container.getValue()) {
- final PathArgument childId = child.getIdentifier();
-
- LOG.trace("Resolving event for child {}", childId);
- resolveSameEventRecursivelly(state.child(childId), child, eventFactory);
- }
- }
-
- final DOMImmutableDataChangeEvent event = eventFactory.create(state.getPath(), node);
- LOG.trace("Adding event {} at path {}", event, state.getPath());
- state.addEvent(event);
- state.collectEvents(event.getOriginalSubtree(), event.getUpdatedSubtree(), collectedEvents);
- }
-
- private boolean resolveSubtreeChangeEvent(final ResolveDataChangeState state,
- final DataTreeCandidateNode modification) {
- final Optional<NormalizedNode<?, ?>> maybeBefore = modification.getDataBefore();
- final Optional<NormalizedNode<?, ?>> maybeAfter = modification.getDataAfter();
-
- Preconditions.checkArgument(maybeBefore.isPresent(), "Subtree change with before-data not present at path %s",
- state.getPath());
- Preconditions.checkArgument(maybeAfter.isPresent(), "Subtree change with after-data not present at path %s",
- state.getPath());
-
- if (!state.needsProcessing()) {
- LOG.trace("Not processing modified subtree {}", state.getPath());
- return true;
- }
-
- DataChangeScope scope = null;
- for (DataTreeCandidateNode childMod : modification.getChildNodes()) {
- final ResolveDataChangeState childState = state.child(childMod.getIdentifier());
-
- switch (childMod.getModificationType()) {
- case APPEARED:
- case DELETE:
- case DISAPPEARED:
- case WRITE:
- if (resolveAnyChangeEvent(childState, childMod)) {
- scope = DataChangeScope.ONE;
- }
- break;
- case SUBTREE_MODIFIED:
- if (resolveSubtreeChangeEvent(childState, childMod) && scope == null) {
- scope = DataChangeScope.SUBTREE;
- }
- break;
- case UNMODIFIED:
- // no-op
- break;
- default:
- break;
- }
- }
-
- final NormalizedNode<?, ?> before = maybeBefore.get();
- final NormalizedNode<?, ?> after = maybeAfter.get();
-
- if (scope != null) {
- DOMImmutableDataChangeEvent one = DOMImmutableDataChangeEvent.builder(scope)
- .addUpdated(state.getPath(), before, after).build();
- state.addEvent(one);
- }
-
- state.collectEvents(before, after, collectedEvents);
- return scope != null;
- }
-
- public static ResolveDataChangeEventsTask create(final DataTreeCandidate candidate,
- final ListenerTree listenerTree) {
- return new ResolveDataChangeEventsTask(candidate, listenerTree);
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.dom.store.impl;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder;
-import org.opendaylight.controller.sal.core.compat.DataChangeListenerRegistration;
-import org.opendaylight.mdsal.dom.spi.RegistrationTreeNode;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Recursion state used in {@link ResolveDataChangeEventsTask}. Instances of this
- * method track which listeners are affected by a particular change node. It takes
- * care of properly inheriting SUB/ONE listeners and also provides a means to
- * understand when actual processing need not occur.
- */
-final class ResolveDataChangeState {
- private static final Logger LOG = LoggerFactory.getLogger(ResolveDataChangeState.class);
-
- /**
- * Inherited from all parents.
- */
- private final Iterable<Builder> inheritedSub;
-
- /**
- * Inherited from immediate parent.
- */
- private final Collection<Builder> inheritedOne;
- private final YangInstanceIdentifier nodeId;
- private final Collection<RegistrationTreeNode<DataChangeListenerRegistration<?>>> nodes;
-
- private final Map<DataChangeListenerRegistration<?>, Builder> subBuilders;
- private final Map<DataChangeListenerRegistration<?>, Builder> oneBuilders;
- private final Map<DataChangeListenerRegistration<?>, Builder> baseBuilders;
-
- private ResolveDataChangeState(final YangInstanceIdentifier nodeId,
- final Iterable<Builder> inheritedSub, final Collection<Builder> inheritedOne,
- final Collection<RegistrationTreeNode<DataChangeListenerRegistration<?>>> nodes) {
- this.nodeId = Preconditions.checkNotNull(nodeId);
- this.nodes = Preconditions.checkNotNull(nodes);
- this.inheritedSub = Preconditions.checkNotNull(inheritedSub);
- this.inheritedOne = Preconditions.checkNotNull(inheritedOne);
-
- /*
- * Collect the nodes which need to be propagated from us to the child.
- */
- final Map<DataChangeListenerRegistration<?>, Builder> sub = new HashMap<>();
- final Map<DataChangeListenerRegistration<?>, Builder> one = new HashMap<>();
- final Map<DataChangeListenerRegistration<?>, Builder> base = new HashMap<>();
- for (RegistrationTreeNode<DataChangeListenerRegistration<?>> n : nodes) {
- for (DataChangeListenerRegistration<?> l : n.getRegistrations()) {
- final Builder b = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE);
- switch (l.getScope()) {
- case BASE:
- base.put(l, b);
- break;
- case ONE:
- one.put(l, b);
- break;
- case SUBTREE:
- sub.put(l, b);
- break;
- default:
- break;
- }
- }
- }
-
- baseBuilders = maybeEmpty(base);
- oneBuilders = maybeEmpty(one);
- subBuilders = maybeEmpty(sub);
- }
-
- private static <K, V> Map<K, V> maybeEmpty(final Map<K, V> map) {
- if (map.isEmpty()) {
- return Collections.emptyMap();
- }
- return map;
- }
-
- /**
- * Create an initial state handle at a particular root node.
- *
- * @param rootId root instance identifier
- * @param registrationTreeNode root node
- */
- public static ResolveDataChangeState initial(final YangInstanceIdentifier rootId,
- final RegistrationTreeNode<DataChangeListenerRegistration<?>> registrationTreeNode) {
- return new ResolveDataChangeState(rootId, Collections.<Builder>emptyList(),
- Collections.<Builder>emptyList(), Collections.singletonList(registrationTreeNode));
- }
-
- /**
- * Create a state handle for iterating over a particular child.
- *
- * @param childId ID of the child
- * @return State handle
- */
- public ResolveDataChangeState child(final PathArgument childId) {
- /*
- * We instantiate a concatenation only when needed:
- *
- * 1) If our collection is empty, we reuse the parent's. This is typically the case
- * for intermediate node, which should be the vast majority.
- * 2) If the parent's iterable is a Collection and it is empty, reuse our collection.
- * This is the case for the first node which defines a subtree listener in a
- * particular subtree.
- * 3) Concatenate the two collections. This happens when we already have some
- * subtree listeners and we encounter a node which adds a few more.
- *
- * This allows us to lower number of objects allocated and also
- * speeds up Iterables.isEmpty() in needsProcessing().
- *
- * Note that the check for Collection in 2) relies on precisely this logic, which
- * ensures that we simply cannot see an empty concatenation, but rather start off with
- * an empty collection, then switch to a non-empty collection and finally switch to
- * a concatenation. This saves us from instantiating iterators, which a trivial
- * Iterables.isEmpty() would do as soon as we cross case 3).
- */
- final Iterable<Builder> sb;
- if (!subBuilders.isEmpty()) {
- if (inheritedSub instanceof Collection && ((Collection<?>) inheritedSub).isEmpty()) {
- sb = subBuilders.values();
- } else {
- sb = Iterables.concat(inheritedSub, subBuilders.values());
- }
- } else {
- sb = inheritedSub;
- }
-
- return new ResolveDataChangeState(nodeId.node(childId), sb,
- oneBuilders.values(), getListenerChildrenWildcarded(nodes, childId));
- }
-
- /**
- * Get the current path.
- *
- * @return Current path.
- */
- public YangInstanceIdentifier getPath() {
- return nodeId;
- }
-
- /**
- * Check if this child needs processing.
- *
- * @return True if processing needs to occur, false otherwise.
- */
- public boolean needsProcessing() {
- // May have underlying listeners, so we need to process
- if (!nodes.isEmpty()) {
- return true;
- }
- // Have ONE listeners
- if (!inheritedOne.isEmpty()) {
- return true;
- }
-
- /*
- * Have SUBTREE listeners
- *
- * This is slightly magical replacement for !Iterables.isEmpty(inheritedSub).
- * It relies on the logic in child(), which gives us the guarantee that when
- * inheritedSub is not a Collection, it is guaranteed to be non-empty (which
- * means we need to process). If it is a collection, we still need to check
- * it for emptiness.
- *
- * Unlike Iterables.isEmpty() this code does not instantiate any temporary
- * objects and is thus more efficient.
- */
- if (inheritedSub instanceof Collection) {
- return !((Collection<?>) inheritedSub).isEmpty();
- }
-
- // Non-Collection => non-empty => have to process
- return true;
- }
-
- /**
- * Add an event to all current listeners.
- */
- public void addEvent(final DOMImmutableDataChangeEvent event) {
- // Subtree builders get always notified
- for (Builder b : subBuilders.values()) {
- b.merge(event);
- }
- for (Builder b : inheritedSub) {
- b.merge(event);
- }
-
- if (event.getScope() == DataChangeScope.ONE || event.getScope() == DataChangeScope.BASE) {
- for (Builder b : oneBuilders.values()) {
- b.merge(event);
- }
- }
-
- if (event.getScope() == DataChangeScope.BASE) {
- for (Builder b : inheritedOne) {
- b.merge(event);
- }
- for (Builder b : baseBuilders.values()) {
- b.merge(event);
- }
- }
- }
-
- /**
- * Gather all non-empty events into the provided map.
- *
- * @param before before-image
- * @param after after-image
- * @param map target map
- */
- public void collectEvents(final NormalizedNode<?, ?> before, final NormalizedNode<?, ?> after,
- final Multimap<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> map) {
- for (Entry<DataChangeListenerRegistration<?>, Builder> e : baseBuilders.entrySet()) {
- final Builder b = e.getValue();
- if (!b.isEmpty()) {
- map.put(e.getKey(), b.setBefore(before).setAfter(after).build());
- }
- }
- for (Entry<DataChangeListenerRegistration<?>, Builder> e : oneBuilders.entrySet()) {
- final Builder b = e.getValue();
- if (!b.isEmpty()) {
- map.put(e.getKey(), b.setBefore(before).setAfter(after).build());
- }
- }
- for (Entry<DataChangeListenerRegistration<?>, Builder> e : subBuilders.entrySet()) {
- final Builder b = e.getValue();
- if (!b.isEmpty()) {
- map.put(e.getKey(), b.setBefore(before).setAfter(after).build());
- }
- }
-
- LOG.trace("Collected events {}", map);
- }
-
- private static Collection<RegistrationTreeNode<DataChangeListenerRegistration<?>>> getListenerChildrenWildcarded(
- final Collection<RegistrationTreeNode<DataChangeListenerRegistration<?>>> parentNodes,
- final PathArgument child) {
- if (parentNodes.isEmpty()) {
- return Collections.emptyList();
- }
-
- final List<RegistrationTreeNode<DataChangeListenerRegistration<?>>> result = new ArrayList<>();
- if (child instanceof NodeWithValue || child instanceof NodeIdentifierWithPredicates) {
- NodeIdentifier wildcardedIdentifier = new NodeIdentifier(child.getNodeType());
- addChildNodes(result, parentNodes, wildcardedIdentifier);
- }
- addChildNodes(result, parentNodes, child);
- return result;
- }
-
- private static void addChildNodes(final List<RegistrationTreeNode<DataChangeListenerRegistration<?>>> result,
- final Collection<RegistrationTreeNode<DataChangeListenerRegistration<?>>> parentNodes,
- final PathArgument childIdentifier) {
- for (RegistrationTreeNode<DataChangeListenerRegistration<?>> node : parentNodes) {
- RegistrationTreeNode<DataChangeListenerRegistration<?>> child = node.getExactChild(childIdentifier);
- if (child != null) {
- result.add(child);
- }
- }
- }
-}