From a46305fbc6bb7ec6883c21298d356a5e4fbbb015 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Thu, 4 Feb 2016 01:39:20 -0500 Subject: [PATCH] Bug 4823: Offload generation of DCNs from Shard Generation of data change notifications can be expensive with large lists which can block the Shard actor for many seconds. This processing was offloaded to other actors to free up the Shard, one for DCLs and the other for DTCLs. I separated the 2 types of listeners b/c DCN generation is much more expensive than DTCs so at least DTCLs aren't held up by DCLs. Change-Id: I1bfb5d572c793f8eb703ebf0a7fd9bf628747168 Signed-off-by: Tom Pantelis --- .../main/resources/initial/factory-akka.conf | 4 +- ...taTreeNotificationPublisherActorProxy.java | 65 ++++++++++++++ ...faultShardDataChangeListenerPublisher.java | 87 +++++++++++++++++++ ...ShardDataTreeChangeListenerPublisher.java} | 42 +++++++-- .../datastore/DistributedDataStore.java | 2 +- .../controller/cluster/datastore/Shard.java | 4 +- .../ShardDataChangeListenerPublisher.java | 26 ++++++ ...DataChangeListenerPublisherActorProxy.java | 53 +++++++++++ .../cluster/datastore/ShardDataTree.java | 53 +++++------ .../ShardDataTreeChangeListenerPublisher.java | 19 ++++ ...TreeChangeListenerPublisherActorProxy.java | 51 +++++++++++ .../ShardDataTreeNotificationManager.java | 38 -------- .../ShardDataTreeNotificationPublisher.java | 22 +++++ ...ardDataTreeNotificationPublisherActor.java | 49 +++++++++++ .../cluster/datastore/utils/ActorContext.java | 2 +- 15 files changed, 444 insertions(+), 73 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeNotificationPublisherActorProxy.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataChangeListenerPublisher.java rename opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/{ShardDataTreeChangePublisher.java => DefaultShardDataTreeChangeListenerPublisher.java} (54%) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangeListenerPublisher.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangeListenerPublisherActorProxy.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangeListenerPublisher.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangeListenerPublisherActorProxy.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationManager.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationPublisher.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationPublisherActor.java diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/factory-akka.conf b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/factory-akka.conf index 9d80299618..e561b0eefc 100644 --- a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/factory-akka.conf +++ b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/factory-akka.conf @@ -2,8 +2,8 @@ odl-cluster-data { bounded-mailbox { mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" - mailbox-capacity = 1000 - mailbox-push-timeout-time = 100ms + mailbox-capacity = 5000 + mailbox-push-timeout-time = 10ms } metric-capture-enabled = true diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeNotificationPublisherActorProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeNotificationPublisherActorProxy.java new file mode 100644 index 0000000000..93081eb735 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeNotificationPublisherActorProxy.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorContext; +import akka.actor.ActorRef; +import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract base class for a ShardDataTreeNotificationPublisher that offloads the generation and publication + * of data tree notifications to an actor. + * + * @author Thomas Pantelis + */ +@NotThreadSafe +abstract class AbstractShardDataTreeNotificationPublisherActorProxy implements ShardDataTreeNotificationPublisher { + private static final Logger LOG = LoggerFactory.getLogger(AbstractShardDataTreeNotificationPublisherActorProxy.class); + + private final ActorContext actorContext; + private final String actorName; + private ActorRef notifierActor; + + protected AbstractShardDataTreeNotificationPublisherActorProxy(ActorContext actorContext, String actorName) { + this.actorContext = actorContext; + this.actorName = actorName; + } + + protected AbstractShardDataTreeNotificationPublisherActorProxy( + AbstractShardDataTreeNotificationPublisherActorProxy other) { + this.actorContext = null; + this.actorName = null; + this.notifierActor = other.getNotifierActor(); + } + + protected abstract ShardDataTreeNotificationPublisher getDelegatePublisher(); + + @Override + public void publishChanges(DataTreeCandidate candidate, String logContext) { + getNotifierActor().tell(new ShardDataTreeNotificationPublisherActor.PublishNotifications( + getDelegatePublisher(), candidate, logContext), ActorRef.noSender()); + } + + private ActorRef getNotifierActor() { + if(notifierActor == null) { + LOG.debug("Creating actor {}", actorName); + + String dispatcher = new Dispatchers(actorContext.system().dispatchers()).getDispatcherPath( + Dispatchers.DispatcherType.Notification); + notifierActor = actorContext.actorOf(ShardDataTreeNotificationPublisherActor.props() + .withDispatcher(dispatcher).withMailbox( + org.opendaylight.controller.cluster.datastore.utils.ActorContext.BOUNDED_MAILBOX), actorName); + } + + return notifierActor; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataChangeListenerPublisher.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataChangeListenerPublisher.java new file mode 100644 index 0000000000..cf98f38d07 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataChangeListenerPublisher.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Stopwatch; +import java.util.concurrent.TimeUnit; +import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent; +import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration; +import org.opendaylight.controller.md.sal.dom.store.impl.ResolveDataChangeEventsTask; +import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree; +import org.opendaylight.yangtools.util.concurrent.NotificationManager; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Default implementation of ShardDataChangeListenerPublisher that directly generates and publishes + * notifications for DataChangeListeners. + * + * @author Thomas Pantelis + */ +@NotThreadSafe +final class DefaultShardDataChangeListenerPublisher implements ShardDataChangeListenerPublisher, + NotificationManager, DOMImmutableDataChangeEvent> { + private static final Logger LOG = LoggerFactory.getLogger(DefaultShardDataChangeListenerPublisher.class); + + private final ListenerTree dataChangeListenerTree = ListenerTree.create(); + private final Stopwatch timer = Stopwatch.createUnstarted(); + + @Override + public void submitNotification(final DataChangeListenerRegistration listener, final DOMImmutableDataChangeEvent notification) { + LOG.debug("Notifying listener {} about {}", listener.getInstance(), notification); + + listener.getInstance().onDataChanged(notification); + } + + @Override + public void submitNotifications(final DataChangeListenerRegistration listener, final Iterable notifications) { + final AsyncDataChangeListener> instance = listener.getInstance(); + LOG.debug("Notifying listener {} about {}", instance, notifications); + + for (DOMImmutableDataChangeEvent n : notifications) { + instance.onDataChanged(n); + } + } + + @Override + public void publishChanges(DataTreeCandidate candidate, String logContext) { + timer.start(); + + try { + ResolveDataChangeEventsTask.create(candidate, dataChangeListenerTree).resolve(this); + } finally { + timer.stop(); + long elapsedTime = timer.elapsed(TimeUnit.MILLISECONDS); + if(elapsedTime >= PUBLISH_DELAY_THRESHOLD_IN_MS) { + LOG.warn("{}: Generation of DataChange events took longer than expected. Elapsed time: {}", + logContext, timer); + } else { + LOG.debug("{}: Elapsed time for generation of DataChange events: {}", logContext, timer); + } + + timer.reset(); + } + } + + @Override + public >> DataChangeListenerRegistration + registerDataChangeListener(YangInstanceIdentifier path, L listener, DataChangeScope scope) { + return dataChangeListenerTree.registerDataChangeListener(path, listener, scope); + } + + @Override + public ShardDataChangeListenerPublisher newInstance() { + return new DefaultShardDataChangeListenerPublisher(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangePublisher.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataTreeChangeListenerPublisher.java similarity index 54% rename from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangePublisher.java rename to opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataTreeChangeListenerPublisher.java index c1555b3257..217ffd358c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangePublisher.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataTreeChangeListenerPublisher.java @@ -7,24 +7,56 @@ */ package org.opendaylight.controller.cluster.datastore; +import com.google.common.base.Stopwatch; import java.util.Collection; import java.util.Collections; +import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration; import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTreeChangePublisher; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Default implementation of ShardDataTreeChangeListenerPublisher that directly generates and publishes + * notifications for DataTreeChangeListeners. + * + * @author Thomas Pantelis + */ @NotThreadSafe -final class ShardDataTreeChangePublisher extends AbstractDOMStoreTreeChangePublisher { - private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeChangePublisher.class); +final class DefaultShardDataTreeChangeListenerPublisher extends AbstractDOMStoreTreeChangePublisher + implements ShardDataTreeChangeListenerPublisher { + private static final Logger LOG = LoggerFactory.getLogger(DefaultShardDataTreeChangeListenerPublisher.class); + + private final Stopwatch timer = Stopwatch.createUnstarted(); + + @Override + public void publishChanges(final DataTreeCandidate candidate, String logContext) { + timer.start(); + + try { + processCandidateTree(candidate); + } finally { + timer.stop(); + long elapsedTime = timer.elapsed(TimeUnit.MILLISECONDS); + if(elapsedTime >= PUBLISH_DELAY_THRESHOLD_IN_MS) { + LOG.warn("{}: Generation of DataTreeCandidateNode events took longer than expected. Elapsed time: {}", + logContext, timer); + } else { + LOG.debug("{}: Elapsed time for generation of DataTreeCandidateNode events: {}", logContext, timer); + } + + timer.reset(); + } + } - void publishChanges(final DataTreeCandidate candidate) { - processCandidateTree(candidate); + @Override + public ShardDataTreeChangeListenerPublisher newInstance() { + return new DefaultShardDataTreeChangeListenerPublisher(); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 4b2013cd28..d31217042a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -239,7 +239,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, for(int i=0;i<100;i++) { try { return actorSystem.actorOf(builder.props().withDispatcher(shardDispatcher).withMailbox( - ActorContext.MAILBOX), shardManagerId); + ActorContext.BOUNDED_MAILBOX), shardManagerId); } catch (Exception e){ lastException = e; Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 439c2baac8..57e85570a9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -133,7 +133,9 @@ public class Shard extends RaftActor { LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent()); - store = new ShardDataTree(builder.getSchemaContext(), builder.getTreeType()); + store = new ShardDataTree(builder.getSchemaContext(), builder.getTreeType(), + new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher"), + new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher"), name); shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(), datastoreContext.getDataStoreMXBeanType()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangeListenerPublisher.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangeListenerPublisher.java new file mode 100644 index 0000000000..291cca0313 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangeListenerPublisher.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +/** + * Interface for a class that generates and publishes notifications for DataChangeListeners. + * + * @author Thomas Pantelis + */ +interface ShardDataChangeListenerPublisher extends ShardDataTreeNotificationPublisher { + ShardDataChangeListenerPublisher newInstance(); + + >> DataChangeListenerRegistration + registerDataChangeListener(final YangInstanceIdentifier path,final L listener, final DataChangeScope scope); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangeListenerPublisherActorProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangeListenerPublisherActorProxy.java new file mode 100644 index 0000000000..0b898ede3c --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangeListenerPublisherActorProxy.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorContext; +import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +/** + * Implementation of ShardDataChangeListenerPublisher that offloads the generation and publication + * of data change notifications to an actor. + * + * @author Thomas Pantelis + */ +@NotThreadSafe +class ShardDataChangeListenerPublisherActorProxy extends AbstractShardDataTreeNotificationPublisherActorProxy + implements ShardDataChangeListenerPublisher { + + private final ShardDataChangeListenerPublisher delegatePublisher = new DefaultShardDataChangeListenerPublisher(); + + ShardDataChangeListenerPublisherActorProxy(ActorContext actorContext, String actorName) { + super(actorContext, actorName); + } + + private ShardDataChangeListenerPublisherActorProxy(ShardDataChangeListenerPublisherActorProxy other) { + super(other); + } + + @Override + public >> DataChangeListenerRegistration registerDataChangeListener( + YangInstanceIdentifier path, L listener, DataChangeScope scope) { + return delegatePublisher.registerDataChangeListener(path, listener, scope); + } + + @Override + public ShardDataChangeListenerPublisher newInstance() { + return new ShardDataChangeListenerPublisherActorProxy(this); + } + + @Override + protected ShardDataTreeNotificationPublisher getDelegatePublisher() { + return delegatePublisher; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 64eb6c87dc..ad5aab3453 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -19,8 +19,6 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataCh import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration; -import org.opendaylight.controller.md.sal.dom.store.impl.ResolveDataChangeEventsTask; -import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -48,16 +46,27 @@ import org.slf4j.LoggerFactory; public class ShardDataTree extends ShardDataTreeTransactionParent { private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class); private static final YangInstanceIdentifier ROOT_PATH = YangInstanceIdentifier.builder().build(); - private static final ShardDataTreeNotificationManager MANAGER = new ShardDataTreeNotificationManager(); + private final Map transactionChains = new HashMap<>(); - private final ShardDataTreeChangePublisher treeChangePublisher = new ShardDataTreeChangePublisher(); - private final ListenerTree listenerTree = ListenerTree.create(); + private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher; + private final ShardDataChangeListenerPublisher dataChangeListenerPublisher; private final TipProducingDataTree dataTree; + private final String logContext; private SchemaContext schemaContext; - public ShardDataTree(final SchemaContext schemaContext, final TreeType treeType) { + public ShardDataTree(final SchemaContext schemaContext, final TreeType treeType, + final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher, + final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) { dataTree = InMemoryDataTreeFactory.getInstance().create(treeType); updateSchemaContext(schemaContext); + this.treeChangeListenerPublisher = treeChangeListenerPublisher; + this.dataChangeListenerPublisher = dataChangeListenerPublisher; + this.logContext = logContext; + } + + public ShardDataTree(final SchemaContext schemaContext, final TreeType treeType) { + this(schemaContext, treeType, new DefaultShardDataTreeChangeListenerPublisher(), + new DefaultShardDataChangeListenerPublisher(), ""); } public TipProducingDataTree getDataTree() { @@ -102,33 +111,27 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } public void notifyListeners(final DataTreeCandidate candidate) { - LOG.debug("Notifying listeners on candidate {}", candidate); - - // DataTreeChanges first, as they are more light-weight - treeChangePublisher.publishChanges(candidate); - - // DataChanges second, as they are heavier - ResolveDataChangeEventsTask.create(candidate, listenerTree).resolve(MANAGER); + treeChangeListenerPublisher.publishChanges(candidate, logContext); + dataChangeListenerPublisher.publishChanges(candidate, logContext); } void notifyOfInitialData(DataChangeListenerRegistration>> listenerReg, Optional currentState) { if(currentState.isPresent()) { - ListenerTree localListenerTree = ListenerTree.create(); - localListenerTree.registerDataChangeListener(listenerReg.getPath(), listenerReg.getInstance(), + ShardDataChangeListenerPublisher localPublisher = dataChangeListenerPublisher.newInstance(); + localPublisher.registerDataChangeListener(listenerReg.getPath(), listenerReg.getInstance(), listenerReg.getScope()); - - ResolveDataChangeEventsTask.create(currentState.get(), localListenerTree).resolve(MANAGER); + localPublisher.publishChanges(currentState.get(), logContext); } } void notifyOfInitialData(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener, final Optional currentState) { if(currentState.isPresent()) { - ShardDataTreeChangePublisher localTreeChangePublisher = new ShardDataTreeChangePublisher(); - localTreeChangePublisher.registerTreeChangeListener(path, listener); - localTreeChangePublisher.publishChanges(currentState.get()); + ShardDataTreeChangeListenerPublisher localPublisher = treeChangeListenerPublisher.newInstance(); + localPublisher.registerTreeChangeListener(path, listener); + localPublisher.publishChanges(currentState.get(), logContext); } } @@ -145,7 +148,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { if (chain != null) { chain.close(); } else { - LOG.debug("Closing non-existent transaction chain {}", transactionChainId); + LOG.debug("{}: Closing non-existent transaction chain {}", logContext, transactionChainId); } } @@ -154,7 +157,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final AsyncDataChangeListener> listener, final DataChangeScope scope) { final DataChangeListenerRegistration>> reg = - listenerTree.registerDataChangeListener(path, listener, scope); + dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope); return new SimpleEntry<>(reg, readCurrentData()); } @@ -167,20 +170,20 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { public Entry, Optional> registerTreeChangeListener( final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) { - final ListenerRegistration reg = treeChangePublisher.registerTreeChangeListener( + final ListenerRegistration reg = treeChangeListenerPublisher.registerTreeChangeListener( path, listener); return new SimpleEntry<>(reg, readCurrentData()); } void applyForeignCandidate(final String identifier, final DataTreeCandidate foreign) throws DataValidationFailedException { - LOG.debug("Applying foreign transaction {}", identifier); + LOG.debug("{}: Applying foreign transaction {}", logContext, identifier); final DataTreeModification mod = dataTree.takeSnapshot().newModification(); DataTreeCandidates.applyToModification(mod, foreign); mod.ready(); - LOG.trace("Applying foreign modification {}", mod); + LOG.trace("{}: Applying foreign modification {}", logContext, mod); dataTree.validate(mod); final DataTreeCandidate candidate = dataTree.prepare(mod); dataTree.commit(candidate); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangeListenerPublisher.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangeListenerPublisher.java new file mode 100644 index 0000000000..d4a5156d2b --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangeListenerPublisher.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher; + +/** + * Interface for a class that generates and publishes notifications for DataTreeChangeListeners. + * + * @author Thomas Pantelis + */ +interface ShardDataTreeChangeListenerPublisher extends ShardDataTreeNotificationPublisher, DOMStoreTreeChangePublisher { + ShardDataTreeChangeListenerPublisher newInstance(); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangeListenerPublisherActorProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangeListenerPublisherActorProxy.java new file mode 100644 index 0000000000..2ac3ee8843 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangeListenerPublisherActorProxy.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorContext; +import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; + +/** + * Implementation of ShardDataTreeChangeListenerPublisher that offloads the generation and publication + * of data tree change notifications to an actor. + * + * @author Thomas Pantelis + */ +@NotThreadSafe +class ShardDataTreeChangeListenerPublisherActorProxy extends AbstractShardDataTreeNotificationPublisherActorProxy + implements ShardDataTreeChangeListenerPublisher { + + private final ShardDataTreeChangeListenerPublisher delegatePublisher = new DefaultShardDataTreeChangeListenerPublisher(); + + ShardDataTreeChangeListenerPublisherActorProxy(ActorContext actorContext, String actorName) { + super(actorContext, actorName); + } + + private ShardDataTreeChangeListenerPublisherActorProxy(ShardDataTreeChangeListenerPublisherActorProxy other) { + super(other); + } + + @Override + public ListenerRegistration registerTreeChangeListener( + YangInstanceIdentifier treeId, L listener) { + return delegatePublisher.registerTreeChangeListener(treeId, listener); + } + + @Override + public ShardDataTreeChangeListenerPublisher newInstance() { + return new ShardDataTreeChangeListenerPublisherActorProxy(this); + } + + @Override + protected ShardDataTreeNotificationPublisher getDelegatePublisher() { + return delegatePublisher; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationManager.java deleted file mode 100644 index 8a54fc6231..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationManager.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.datastore; - -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; -import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent; -import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration; -import org.opendaylight.yangtools.util.concurrent.NotificationManager; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -final class ShardDataTreeNotificationManager implements NotificationManager, DOMImmutableDataChangeEvent> { - private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeNotificationManager.class); - - @Override - public void submitNotification(final DataChangeListenerRegistration listener, final DOMImmutableDataChangeEvent notification) { - LOG.debug("Notifying listener {} about {}", listener.getInstance(), notification); - - listener.getInstance().onDataChanged(notification); - } - - @Override - public void submitNotifications(final DataChangeListenerRegistration listener, final Iterable notifications) { - final AsyncDataChangeListener> instance = listener.getInstance(); - LOG.debug("Notifying listener {} about {}", instance, notifications); - - for (DOMImmutableDataChangeEvent n : notifications) { - instance.onDataChanged(n); - } - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationPublisher.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationPublisher.java new file mode 100644 index 0000000000..e51b81cc94 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationPublisher.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import java.util.concurrent.TimeUnit; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; + +/** + * Interface for a class the publishes data tree notifications. + * + * @author Thomas Pantelis + */ +interface ShardDataTreeNotificationPublisher { + long PUBLISH_DELAY_THRESHOLD_IN_MS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.SECONDS); + + void publishChanges(final DataTreeCandidate candidate, String logContext); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationPublisherActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationPublisherActor.java new file mode 100644 index 0000000000..e4e7eb33e9 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationPublisherActor.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.Props; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; + +/** + * Actor used to generate and publish data tree notifications. This is used to offload the potentially + * expensive notification generation from the Shard actor. + * + * @author Thomas Pantelis + */ +public class ShardDataTreeNotificationPublisherActor extends AbstractUntypedActor { + + @Override + protected void handleReceive(Object message) { + if(message instanceof PublishNotifications) { + ((PublishNotifications)message).publish(); + } + } + + static Props props() { + return Props.create(ShardDataTreeNotificationPublisherActor.class); + } + + static class PublishNotifications { + private final ShardDataTreeNotificationPublisher publisher; + private final DataTreeCandidate candidate; + private final String logContext; + + PublishNotifications(ShardDataTreeNotificationPublisher publisher, DataTreeCandidate candidate, + String logContext) { + this.publisher = publisher; + this.candidate = candidate; + this.logContext = logContext; + } + + private void publish() { + publisher.publishChanges(candidate, logContext); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 37726d201b..71e6b6491a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -83,7 +83,7 @@ public class ActorContext { return actualFailure; } }; - public static final String MAILBOX = "bounded-mailbox"; + public static final String BOUNDED_MAILBOX = "bounded-mailbox"; public static final String COMMIT = "commit"; private final ActorSystem actorSystem; -- 2.36.6