X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDefaultShardDataTreeChangeListenerPublisher.java;h=1b8ed12feda00d079d04e8f8b06f25eb1b319b98;hb=99f80f27bee37bb23e345420bf14bb7bb4793c28;hp=217ffd358c5358fa3f7a883d04d4d014f6c0eeee;hpb=a46305fbc6bb7ec6883c21298d356a5e4fbbb015;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataTreeChangeListenerPublisher.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataTreeChangeListenerPublisher.java index 217ffd358c..1b8ed12fed 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataTreeChangeListenerPublisher.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataTreeChangeListenerPublisher.java @@ -7,71 +7,89 @@ */ package org.opendaylight.controller.cluster.datastore; -import com.google.common.base.Stopwatch; -import java.util.Collection; -import java.util.Collections; -import java.util.concurrent.TimeUnit; -import javax.annotation.concurrent.NotThreadSafe; -import org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration; -import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTreeChangePublisher; +import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; +import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration; +import org.opendaylight.mdsal.dom.spi.store.AbstractDOMStoreTreeChangePublisher; +import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Default implementation of ShardDataTreeChangeListenerPublisher that directly generates and publishes - * notifications for DataTreeChangeListeners. + * notifications for DataTreeChangeListeners. This class is NOT thread-safe. * * @author Thomas Pantelis */ -@NotThreadSafe final class DefaultShardDataTreeChangeListenerPublisher extends AbstractDOMStoreTreeChangePublisher implements ShardDataTreeChangeListenerPublisher { private static final Logger LOG = LoggerFactory.getLogger(DefaultShardDataTreeChangeListenerPublisher.class); + private String logContext; - private final Stopwatch timer = Stopwatch.createUnstarted(); + DefaultShardDataTreeChangeListenerPublisher(final String logContext) { + this.logContext = logContext; + } @Override - public void publishChanges(final DataTreeCandidate candidate, String logContext) { - timer.start(); + public void publishChanges(final DataTreeCandidate candidate) { + if (LOG.isTraceEnabled()) { + LOG.trace("{}: publishChanges: {}", logContext, candidate); + } else { + LOG.debug("{}: publishChanges: rootPath: {}", logContext, candidate.getRootPath()); + } - try { - processCandidateTree(candidate); - } finally { - timer.stop(); - long elapsedTime = timer.elapsed(TimeUnit.MILLISECONDS); - if(elapsedTime >= PUBLISH_DELAY_THRESHOLD_IN_MS) { - LOG.warn("{}: Generation of DataTreeCandidateNode events took longer than expected. Elapsed time: {}", - logContext, timer); - } else { - LOG.debug("{}: Elapsed time for generation of DataTreeCandidateNode events: {}", logContext, timer); - } + processCandidateTree(candidate); + } - timer.reset(); - } + @Override + protected void notifyListener(final AbstractDOMDataTreeChangeListenerRegistration registration, + final List changes) { + LOG.debug("{}: notifyListener: listener: {}", logContext, registration.getInstance()); + registration.getInstance().onDataTreeChanged(changes); } @Override - public ShardDataTreeChangeListenerPublisher newInstance() { - return new DefaultShardDataTreeChangeListenerPublisher(); + protected void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration registration) { + LOG.debug("Registration {} removed", registration); } @Override - protected void notifyListeners(final Collection> registrations, - final YangInstanceIdentifier path, final DataTreeCandidateNode node) { - final Collection changes = Collections.singleton( - DataTreeCandidates.newDataTreeCandidate(path, node)); + public void registerTreeChangeListener(final YangInstanceIdentifier treeId, + final DOMDataTreeChangeListener listener, final Optional initialState, + final Consumer> onRegistration) { + registerTreeChangeListener(treeId, listener, onRegistration); - for (AbstractDOMDataTreeChangeListenerRegistration reg : registrations) { - reg.getInstance().onDataTreeChanged(changes); + if (initialState.isPresent()) { + notifySingleListener(treeId, listener, initialState.get(), logContext); + } else { + listener.onInitialData(); } } - @Override - protected void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration registration) { - LOG.debug("Registration {} removed", registration); + void registerTreeChangeListener(final YangInstanceIdentifier treeId, final DOMDataTreeChangeListener listener, + final Consumer> onRegistration) { + LOG.debug("{}: registerTreeChangeListener: path: {}, listener: {}", logContext, treeId, listener); + + AbstractDOMDataTreeChangeListenerRegistration registration = + super.registerTreeChangeListener(treeId, listener); + + onRegistration.accept(registration); + } + + static void notifySingleListener(final YangInstanceIdentifier treeId, final DOMDataTreeChangeListener listener, + final DataTreeCandidate state, final String logContext) { + LOG.debug("{}: notifySingleListener: path: {}, listener: {}", logContext, treeId, listener); + DefaultShardDataTreeChangeListenerPublisher publisher = + new DefaultShardDataTreeChangeListenerPublisher(logContext); + publisher.logContext = logContext; + publisher.registerTreeChangeListener(treeId, listener); + + if (!publisher.processCandidateTree(state)) { + listener.onInitialData(); + } } }