X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDefaultShardDataChangeListenerPublisher.java;h=89da5b2621b7322eda587c889c8c0a376742421c;hb=27b168d3ca3807123b4877f1ad0662b2610f393d;hp=cf98f38d0789d3a8d9cede7beb6d407e2659186f;hpb=a46305fbc6bb7ec6883c21298d356a5e4fbbb015;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataChangeListenerPublisher.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataChangeListenerPublisher.java index cf98f38d07..89da5b2621 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataChangeListenerPublisher.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataChangeListenerPublisher.java @@ -7,8 +7,8 @@ */ package org.opendaylight.controller.cluster.datastore; -import com.google.common.base.Stopwatch; -import java.util.concurrent.TimeUnit; +import com.google.common.base.Optional; +import java.util.function.Consumer; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; @@ -16,6 +16,7 @@ import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeE import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration; import org.opendaylight.controller.md.sal.dom.store.impl.ResolveDataChangeEventsTask; import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree; +import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.util.concurrent.NotificationManager; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -35,19 +36,25 @@ final class DefaultShardDataChangeListenerPublisher implements ShardDataChangeLi private static final Logger LOG = LoggerFactory.getLogger(DefaultShardDataChangeListenerPublisher.class); private final ListenerTree dataChangeListenerTree = ListenerTree.create(); - private final Stopwatch timer = Stopwatch.createUnstarted(); + private final String logContext; + + DefaultShardDataChangeListenerPublisher(String logContext) { + this.logContext = logContext; + } @Override - public void submitNotification(final DataChangeListenerRegistration listener, final DOMImmutableDataChangeEvent notification) { - LOG.debug("Notifying listener {} about {}", listener.getInstance(), notification); + public void submitNotification(final DataChangeListenerRegistration listener, + final DOMImmutableDataChangeEvent notification) { + LOG.debug("{}: Notifying listener {} about {}", logContext, listener.getInstance(), notification); listener.getInstance().onDataChanged(notification); } @Override - public void submitNotifications(final DataChangeListenerRegistration listener, final Iterable notifications) { + public void submitNotifications(final DataChangeListenerRegistration listener, + final Iterable notifications) { final AsyncDataChangeListener> instance = listener.getInstance(); - LOG.debug("Notifying listener {} about {}", instance, notifications); + LOG.debug("{}: Notifying listener {} about {}", logContext, instance, notifications); for (DOMImmutableDataChangeEvent n : notifications) { instance.onDataChanged(n); @@ -55,33 +62,31 @@ final class DefaultShardDataChangeListenerPublisher implements ShardDataChangeLi } @Override - public void publishChanges(DataTreeCandidate candidate, String logContext) { - timer.start(); - - try { - ResolveDataChangeEventsTask.create(candidate, dataChangeListenerTree).resolve(this); - } finally { - timer.stop(); - long elapsedTime = timer.elapsed(TimeUnit.MILLISECONDS); - if(elapsedTime >= PUBLISH_DELAY_THRESHOLD_IN_MS) { - LOG.warn("{}: Generation of DataChange events took longer than expected. Elapsed time: {}", - logContext, timer); - } else { - LOG.debug("{}: Elapsed time for generation of DataChange events: {}", logContext, timer); - } - - timer.reset(); - } + public void publishChanges(DataTreeCandidate candidate) { + ResolveDataChangeEventsTask.create(candidate, dataChangeListenerTree).resolve(this); } @Override - public >> DataChangeListenerRegistration - registerDataChangeListener(YangInstanceIdentifier path, L listener, DataChangeScope scope) { - return dataChangeListenerTree.registerDataChangeListener(path, listener, scope); + public void registerDataChangeListener(YangInstanceIdentifier path, + AsyncDataChangeListener> listener, DataChangeScope scope, + Optional initialState, + Consumer>>> + onRegistration) { + final DataChangeListenerRegistration>> + registration = dataChangeListenerTree.registerDataChangeListener(path, listener, scope); + + onRegistration.accept(registration); + + if (initialState.isPresent()) { + notifySingleListener(path, listener, scope, initialState.get(), logContext); + } } - @Override - public ShardDataChangeListenerPublisher newInstance() { - return new DefaultShardDataChangeListenerPublisher(); + static void notifySingleListener(final YangInstanceIdentifier path, + final AsyncDataChangeListener> listener, + final DataChangeScope scope, final DataTreeCandidate initialState, String logContext) { + DefaultShardDataChangeListenerPublisher publisher = new DefaultShardDataChangeListenerPublisher(logContext); + publisher.registerDataChangeListener(path, listener, scope, Optional.absent(), noop -> { }); + publisher.publishChanges(initialState); } }