The publishChanges method is only called from the
ShardDataTreeNotificationPublisherActor which is single-threaded so
publishChanges can't be called concurrently. However the
DefaultShardDataChangeListenerPublisher instance is passed via
the PublishNotifications message so the Stopwatch isn't thread safe
wrt thread visibility of its internal state. Therefore it's possible
the change in state done on thread 1 isn't immediately visible to
a subsequent thread. To alleviate this, I moved the Stopwatch and the
elapsed time check to the ShardDataTreeNotificationPublisherActor.
Change-Id: I046e7e92aa96eec01d5a355c8431ef797c534ead
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
String dispatcher = new Dispatchers(actorContext.system().dispatchers()).getDispatcherPath(
Dispatchers.DispatcherType.Notification);
String dispatcher = new Dispatchers(actorContext.system().dispatchers()).getDispatcherPath(
Dispatchers.DispatcherType.Notification);
- notifierActor = actorContext.actorOf(ShardDataTreeNotificationPublisherActor.props()
+ notifierActor = actorContext.actorOf(ShardDataTreeNotificationPublisherActor.props(actorName)
.withDispatcher(dispatcher).withMailbox(
org.opendaylight.controller.cluster.datastore.utils.ActorContext.BOUNDED_MAILBOX), actorName);
}
.withDispatcher(dispatcher).withMailbox(
org.opendaylight.controller.cluster.datastore.utils.ActorContext.BOUNDED_MAILBOX), actorName);
}
*/
package org.opendaylight.controller.cluster.datastore;
*/
package org.opendaylight.controller.cluster.datastore;
-import com.google.common.base.Stopwatch;
-import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
private static final Logger LOG = LoggerFactory.getLogger(DefaultShardDataChangeListenerPublisher.class);
private final ListenerTree dataChangeListenerTree = ListenerTree.create();
private static final Logger LOG = LoggerFactory.getLogger(DefaultShardDataChangeListenerPublisher.class);
private final ListenerTree dataChangeListenerTree = ListenerTree.create();
- private final Stopwatch timer = Stopwatch.createUnstarted();
@Override
public void submitNotification(final DataChangeListenerRegistration<?> listener, final DOMImmutableDataChangeEvent notification) {
@Override
public void submitNotification(final DataChangeListenerRegistration<?> listener, final DOMImmutableDataChangeEvent notification) {
@Override
public void publishChanges(DataTreeCandidate candidate, String logContext) {
@Override
public void publishChanges(DataTreeCandidate candidate, String logContext) {
- timer.start();
-
- try {
- ResolveDataChangeEventsTask.create(candidate, dataChangeListenerTree).resolve(this);
- } finally {
- timer.stop();
- long elapsedTime = timer.elapsed(TimeUnit.MILLISECONDS);
- if(elapsedTime >= PUBLISH_DELAY_THRESHOLD_IN_MS) {
- LOG.warn("{}: Generation of DataChange events took longer than expected. Elapsed time: {}",
- logContext, timer);
- } else {
- LOG.debug("{}: Elapsed time for generation of DataChange events: {}", logContext, timer);
- }
-
- timer.reset();
- }
+ ResolveDataChangeEventsTask.create(candidate, dataChangeListenerTree).resolve(this);
*/
package org.opendaylight.controller.cluster.datastore;
*/
package org.opendaylight.controller.cluster.datastore;
-import com.google.common.base.Stopwatch;
import java.util.Collection;
import java.util.Collections;
import java.util.Collection;
import java.util.Collections;
-import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTreeChangePublisher;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTreeChangePublisher;
implements ShardDataTreeChangeListenerPublisher {
private static final Logger LOG = LoggerFactory.getLogger(DefaultShardDataTreeChangeListenerPublisher.class);
implements ShardDataTreeChangeListenerPublisher {
private static final Logger LOG = LoggerFactory.getLogger(DefaultShardDataTreeChangeListenerPublisher.class);
- private final Stopwatch timer = Stopwatch.createUnstarted();
-
@Override
public void publishChanges(final DataTreeCandidate candidate, String logContext) {
@Override
public void publishChanges(final DataTreeCandidate candidate, String logContext) {
- timer.start();
-
- try {
- processCandidateTree(candidate);
- } finally {
- timer.stop();
- long elapsedTime = timer.elapsed(TimeUnit.MILLISECONDS);
- if(elapsedTime >= PUBLISH_DELAY_THRESHOLD_IN_MS) {
- LOG.warn("{}: Generation of DataTreeCandidateNode events took longer than expected. Elapsed time: {}",
- logContext, timer);
- } else {
- LOG.debug("{}: Elapsed time for generation of DataTreeCandidateNode events: {}", logContext, timer);
- }
-
- timer.reset();
- }
+ processCandidateTree(candidate);
package org.opendaylight.controller.cluster.datastore;
import akka.actor.Props;
package org.opendaylight.controller.cluster.datastore;
import akka.actor.Props;
+import com.google.common.base.Stopwatch;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
* @author Thomas Pantelis
*/
public class ShardDataTreeNotificationPublisherActor extends AbstractUntypedActor {
* @author Thomas Pantelis
*/
public class ShardDataTreeNotificationPublisherActor extends AbstractUntypedActor {
+ private final Stopwatch timer = Stopwatch.createUnstarted();
+ private final String name;
+
+ private ShardDataTreeNotificationPublisherActor(String name) {
+ this.name = name;
+ }
@Override
protected void handleReceive(Object message) {
if(message instanceof PublishNotifications) {
@Override
protected void handleReceive(Object message) {
if(message instanceof PublishNotifications) {
- ((PublishNotifications)message).publish();
+ PublishNotifications publisher = (PublishNotifications)message;
+ timer.start();
+
+ try {
+ publisher.publish();
+ } finally {
+ long elapsedTime = timer.elapsed(TimeUnit.MILLISECONDS);
+
+ if(elapsedTime >= ShardDataTreeNotificationPublisher.PUBLISH_DELAY_THRESHOLD_IN_MS) {
+ LOG.warn("{}: Generation of change events for {} took longer than expected. Elapsed time: {}",
+ publisher.logContext, name, timer);
+ } else {
+ LOG.debug("{}: Elapsed time for generation of change events for {}: {}", publisher.logContext,
+ name, timer);
+ }
+
+ timer.reset();
+ }
- static Props props() {
- return Props.create(ShardDataTreeNotificationPublisherActor.class);
+ static Props props(String notificationType) {
+ return Props.create(ShardDataTreeNotificationPublisherActor.class, notificationType);
}
static class PublishNotifications {
}
static class PublishNotifications {