Bug 4823: Offload generation of DCNs from Shard 96/34096/5
authorTom Pantelis <tpanteli@brocade.com>
Thu, 4 Feb 2016 06:39:20 +0000 (01:39 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Fri, 11 Mar 2016 07:35:13 +0000 (02:35 -0500)
Generation of data change notifications can be expensive with large
lists which can block the Shard actor for many seconds. This processing
was offloaded to other actors to free up the Shard, one for DCLs and the
other for DTCLs. I separated the 2 types of listeners b/c DCN generation
is much more expensive than DTCs so at least DTCLs aren't held up by
DCLs.

Change-Id: I1bfb5d572c793f8eb703ebf0a7fd9bf628747168
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
15 files changed:
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/factory-akka.conf
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeNotificationPublisherActorProxy.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataChangeListenerPublisher.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataTreeChangeListenerPublisher.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangePublisher.java with 54% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangeListenerPublisher.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangeListenerPublisherActorProxy.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangeListenerPublisher.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangeListenerPublisherActorProxy.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationManager.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationPublisher.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationPublisherActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java

index 9d80299..e561b0e 100644 (file)
@@ -2,8 +2,8 @@
 odl-cluster-data {
   bounded-mailbox {
     mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
-    mailbox-capacity = 1000
-    mailbox-push-timeout-time = 100ms
+    mailbox-capacity = 5000
+    mailbox-push-timeout-time = 10ms
   }
 
   metric-capture-enabled = true
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeNotificationPublisherActorProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeNotificationPublisherActorProxy.java
new file mode 100644 (file)
index 0000000..93081eb
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base class for a ShardDataTreeNotificationPublisher that offloads the generation and publication
+ * of data tree notifications to an actor.
+ *
+ * @author Thomas Pantelis
+ */
+@NotThreadSafe
+abstract class AbstractShardDataTreeNotificationPublisherActorProxy implements ShardDataTreeNotificationPublisher {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractShardDataTreeNotificationPublisherActorProxy.class);
+
+    private final ActorContext actorContext;
+    private final String actorName;
+    private ActorRef notifierActor;
+
+    protected AbstractShardDataTreeNotificationPublisherActorProxy(ActorContext actorContext, String actorName) {
+        this.actorContext = actorContext;
+        this.actorName = actorName;
+    }
+
+    protected AbstractShardDataTreeNotificationPublisherActorProxy(
+            AbstractShardDataTreeNotificationPublisherActorProxy other) {
+        this.actorContext = null;
+        this.actorName = null;
+        this.notifierActor = other.getNotifierActor();
+    }
+
+    protected abstract ShardDataTreeNotificationPublisher getDelegatePublisher();
+
+    @Override
+    public void publishChanges(DataTreeCandidate candidate, String logContext) {
+        getNotifierActor().tell(new ShardDataTreeNotificationPublisherActor.PublishNotifications(
+                getDelegatePublisher(), candidate, logContext), ActorRef.noSender());
+    }
+
+    private ActorRef getNotifierActor() {
+        if(notifierActor == null) {
+            LOG.debug("Creating actor {}", actorName);
+
+            String dispatcher = new Dispatchers(actorContext.system().dispatchers()).getDispatcherPath(
+                    Dispatchers.DispatcherType.Notification);
+            notifierActor = actorContext.actorOf(ShardDataTreeNotificationPublisherActor.props()
+                    .withDispatcher(dispatcher).withMailbox(
+                            org.opendaylight.controller.cluster.datastore.utils.ActorContext.BOUNDED_MAILBOX), actorName);
+        }
+
+        return notifierActor;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataChangeListenerPublisher.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataChangeListenerPublisher.java
new file mode 100644 (file)
index 0000000..cf98f38
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Stopwatch;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
+import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
+import org.opendaylight.controller.md.sal.dom.store.impl.ResolveDataChangeEventsTask;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
+import org.opendaylight.yangtools.util.concurrent.NotificationManager;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Default implementation of ShardDataChangeListenerPublisher that directly generates and publishes
+ * notifications for DataChangeListeners.
+ *
+ * @author Thomas Pantelis
+ */
+@NotThreadSafe
+final class DefaultShardDataChangeListenerPublisher implements ShardDataChangeListenerPublisher,
+        NotificationManager<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultShardDataChangeListenerPublisher.class);
+
+    private final ListenerTree dataChangeListenerTree = ListenerTree.create();
+    private final Stopwatch timer = Stopwatch.createUnstarted();
+
+    @Override
+    public void submitNotification(final DataChangeListenerRegistration<?> listener, final DOMImmutableDataChangeEvent notification) {
+        LOG.debug("Notifying listener {} about {}", listener.getInstance(), notification);
+
+        listener.getInstance().onDataChanged(notification);
+    }
+
+    @Override
+    public void submitNotifications(final DataChangeListenerRegistration<?> listener, final Iterable<DOMImmutableDataChangeEvent> notifications) {
+        final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> instance = listener.getInstance();
+        LOG.debug("Notifying listener {} about {}", instance, notifications);
+
+        for (DOMImmutableDataChangeEvent n : notifications) {
+            instance.onDataChanged(n);
+        }
+    }
+
+    @Override
+    public void publishChanges(DataTreeCandidate candidate, String logContext) {
+        timer.start();
+
+        try {
+            ResolveDataChangeEventsTask.create(candidate, dataChangeListenerTree).resolve(this);
+        } finally {
+            timer.stop();
+            long elapsedTime = timer.elapsed(TimeUnit.MILLISECONDS);
+            if(elapsedTime >= PUBLISH_DELAY_THRESHOLD_IN_MS) {
+                LOG.warn("{}: Generation of DataChange events took longer than expected. Elapsed time: {}",
+                        logContext, timer);
+            } else {
+                LOG.debug("{}: Elapsed time for generation of DataChange events: {}", logContext, timer);
+            }
+
+            timer.reset();
+        }
+    }
+
+    @Override
+    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> DataChangeListenerRegistration<L>
+            registerDataChangeListener(YangInstanceIdentifier path, L listener, DataChangeScope scope) {
+        return dataChangeListenerTree.registerDataChangeListener(path, listener, scope);
+    }
+
+    @Override
+    public ShardDataChangeListenerPublisher newInstance() {
+        return new DefaultShardDataChangeListenerPublisher();
+    }
+}
@@ -7,24 +7,56 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import com.google.common.base.Stopwatch;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
 import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTreeChangePublisher;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Default implementation of ShardDataTreeChangeListenerPublisher that directly generates and publishes
+ * notifications for DataTreeChangeListeners.
+ *
+ * @author Thomas Pantelis
+ */
 @NotThreadSafe
-final class ShardDataTreeChangePublisher extends AbstractDOMStoreTreeChangePublisher {
-    private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeChangePublisher.class);
+final class DefaultShardDataTreeChangeListenerPublisher extends AbstractDOMStoreTreeChangePublisher
+        implements ShardDataTreeChangeListenerPublisher {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultShardDataTreeChangeListenerPublisher.class);
+
+    private final Stopwatch timer = Stopwatch.createUnstarted();
+
+    @Override
+    public void publishChanges(final DataTreeCandidate candidate, String logContext) {
+        timer.start();
+
+        try {
+            processCandidateTree(candidate);
+        } finally {
+            timer.stop();
+            long elapsedTime = timer.elapsed(TimeUnit.MILLISECONDS);
+            if(elapsedTime >= PUBLISH_DELAY_THRESHOLD_IN_MS) {
+                LOG.warn("{}: Generation of DataTreeCandidateNode events took longer than expected. Elapsed time: {}",
+                        logContext, timer);
+            } else {
+                LOG.debug("{}: Elapsed time for generation of DataTreeCandidateNode events: {}", logContext, timer);
+            }
+
+            timer.reset();
+        }
+    }
 
-    void publishChanges(final DataTreeCandidate candidate) {
-        processCandidateTree(candidate);
+    @Override
+    public ShardDataTreeChangeListenerPublisher newInstance() {
+        return new DefaultShardDataTreeChangeListenerPublisher();
     }
 
     @Override
index 4b2013c..d312170 100644 (file)
@@ -239,7 +239,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
         for(int i=0;i<100;i++) {
             try {
                 return actorSystem.actorOf(builder.props().withDispatcher(shardDispatcher).withMailbox(
-                        ActorContext.MAILBOX), shardManagerId);
+                        ActorContext.BOUNDED_MAILBOX), shardManagerId);
             } catch (Exception e){
                 lastException = e;
                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
index 439c2ba..57e8557 100644 (file)
@@ -133,7 +133,9 @@ public class Shard extends RaftActor {
 
         LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
 
-        store = new ShardDataTree(builder.getSchemaContext(), builder.getTreeType());
+        store = new ShardDataTree(builder.getSchemaContext(), builder.getTreeType(),
+                new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher"),
+                new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher"), name);
 
         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
                 datastoreContext.getDataStoreMXBeanType());
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangeListenerPublisher.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangeListenerPublisher.java
new file mode 100644 (file)
index 0000000..291cca0
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * Interface for a class that generates and publishes notifications for DataChangeListeners.
+ *
+ * @author Thomas Pantelis
+ */
+interface ShardDataChangeListenerPublisher extends ShardDataTreeNotificationPublisher {
+    ShardDataChangeListenerPublisher newInstance();
+
+    <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> DataChangeListenerRegistration<L>
+            registerDataChangeListener(final YangInstanceIdentifier path,final L listener, final DataChangeScope scope);
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangeListenerPublisherActorProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataChangeListenerPublisherActorProxy.java
new file mode 100644 (file)
index 0000000..0b898ed
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorContext;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * Implementation of ShardDataChangeListenerPublisher that offloads the generation and publication
+ * of data change notifications to an actor.
+ *
+ * @author Thomas Pantelis
+ */
+@NotThreadSafe
+class ShardDataChangeListenerPublisherActorProxy extends AbstractShardDataTreeNotificationPublisherActorProxy
+        implements ShardDataChangeListenerPublisher {
+
+    private final ShardDataChangeListenerPublisher delegatePublisher = new DefaultShardDataChangeListenerPublisher();
+
+    ShardDataChangeListenerPublisherActorProxy(ActorContext actorContext, String actorName) {
+        super(actorContext, actorName);
+    }
+
+    private ShardDataChangeListenerPublisherActorProxy(ShardDataChangeListenerPublisherActorProxy other) {
+        super(other);
+    }
+
+    @Override
+    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> DataChangeListenerRegistration<L> registerDataChangeListener(
+            YangInstanceIdentifier path, L listener, DataChangeScope scope) {
+        return delegatePublisher.registerDataChangeListener(path, listener, scope);
+    }
+
+    @Override
+    public ShardDataChangeListenerPublisher newInstance() {
+        return new ShardDataChangeListenerPublisherActorProxy(this);
+    }
+
+    @Override
+    protected ShardDataTreeNotificationPublisher getDelegatePublisher() {
+        return delegatePublisher;
+    }
+}
index 64eb6c8..ad5aab3 100644 (file)
@@ -19,8 +19,6 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataCh
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
-import org.opendaylight.controller.md.sal.dom.store.impl.ResolveDataChangeEventsTask;
-import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -48,16 +46,27 @@ import org.slf4j.LoggerFactory;
 public class ShardDataTree extends ShardDataTreeTransactionParent {
     private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
     private static final YangInstanceIdentifier ROOT_PATH = YangInstanceIdentifier.builder().build();
-    private static final ShardDataTreeNotificationManager MANAGER = new ShardDataTreeNotificationManager();
+
     private final Map<String, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
-    private final ShardDataTreeChangePublisher treeChangePublisher = new ShardDataTreeChangePublisher();
-    private final ListenerTree listenerTree = ListenerTree.create();
+    private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher;
+    private final ShardDataChangeListenerPublisher dataChangeListenerPublisher;
     private final TipProducingDataTree dataTree;
+    private final String logContext;
     private SchemaContext schemaContext;
 
-    public ShardDataTree(final SchemaContext schemaContext, final TreeType treeType) {
+    public ShardDataTree(final SchemaContext schemaContext, final TreeType treeType,
+            final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
+            final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) {
         dataTree = InMemoryDataTreeFactory.getInstance().create(treeType);
         updateSchemaContext(schemaContext);
+        this.treeChangeListenerPublisher = treeChangeListenerPublisher;
+        this.dataChangeListenerPublisher = dataChangeListenerPublisher;
+        this.logContext = logContext;
+    }
+
+    public ShardDataTree(final SchemaContext schemaContext, final TreeType treeType) {
+        this(schemaContext, treeType, new DefaultShardDataTreeChangeListenerPublisher(),
+                new DefaultShardDataChangeListenerPublisher(), "");
     }
 
     public TipProducingDataTree getDataTree() {
@@ -102,33 +111,27 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     public void notifyListeners(final DataTreeCandidate candidate) {
-        LOG.debug("Notifying listeners on candidate {}", candidate);
-
-        // DataTreeChanges first, as they are more light-weight
-        treeChangePublisher.publishChanges(candidate);
-
-        // DataChanges second, as they are heavier
-        ResolveDataChangeEventsTask.create(candidate, listenerTree).resolve(MANAGER);
+        treeChangeListenerPublisher.publishChanges(candidate, logContext);
+        dataChangeListenerPublisher.publishChanges(candidate, logContext);
     }
 
     void notifyOfInitialData(DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
             NormalizedNode<?, ?>>> listenerReg, Optional<DataTreeCandidate> currentState) {
 
         if(currentState.isPresent()) {
-            ListenerTree localListenerTree = ListenerTree.create();
-            localListenerTree.registerDataChangeListener(listenerReg.getPath(), listenerReg.getInstance(),
+            ShardDataChangeListenerPublisher localPublisher = dataChangeListenerPublisher.newInstance();
+            localPublisher.registerDataChangeListener(listenerReg.getPath(), listenerReg.getInstance(),
                     listenerReg.getScope());
-
-            ResolveDataChangeEventsTask.create(currentState.get(), localListenerTree).resolve(MANAGER);
+            localPublisher.publishChanges(currentState.get(), logContext);
         }
     }
 
     void notifyOfInitialData(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
             final Optional<DataTreeCandidate> currentState) {
         if(currentState.isPresent()) {
-            ShardDataTreeChangePublisher localTreeChangePublisher = new ShardDataTreeChangePublisher();
-            localTreeChangePublisher.registerTreeChangeListener(path, listener);
-            localTreeChangePublisher.publishChanges(currentState.get());
+            ShardDataTreeChangeListenerPublisher localPublisher = treeChangeListenerPublisher.newInstance();
+            localPublisher.registerTreeChangeListener(path, listener);
+            localPublisher.publishChanges(currentState.get(), logContext);
         }
     }
 
@@ -145,7 +148,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         if (chain != null) {
             chain.close();
         } else {
-            LOG.debug("Closing non-existent transaction chain {}", transactionChainId);
+            LOG.debug("{}: Closing non-existent transaction chain {}", logContext, transactionChainId);
         }
     }
 
@@ -154,7 +157,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                     final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
                     final DataChangeScope scope) {
         final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
-                listenerTree.registerDataChangeListener(path, listener, scope);
+                dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope);
 
         return new SimpleEntry<>(reg, readCurrentData());
     }
@@ -167,20 +170,20 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     public Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> registerTreeChangeListener(
             final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) {
-        final ListenerRegistration<DOMDataTreeChangeListener> reg = treeChangePublisher.registerTreeChangeListener(
+        final ListenerRegistration<DOMDataTreeChangeListener> reg = treeChangeListenerPublisher.registerTreeChangeListener(
                 path, listener);
 
         return new SimpleEntry<>(reg, readCurrentData());
     }
 
     void applyForeignCandidate(final String identifier, final DataTreeCandidate foreign) throws DataValidationFailedException {
-        LOG.debug("Applying foreign transaction {}", identifier);
+        LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
 
         final DataTreeModification mod = dataTree.takeSnapshot().newModification();
         DataTreeCandidates.applyToModification(mod, foreign);
         mod.ready();
 
-        LOG.trace("Applying foreign modification {}", mod);
+        LOG.trace("{}: Applying foreign modification {}", logContext, mod);
         dataTree.validate(mod);
         final DataTreeCandidate candidate = dataTree.prepare(mod);
         dataTree.commit(candidate);
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangeListenerPublisher.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangeListenerPublisher.java
new file mode 100644 (file)
index 0000000..d4a5156
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
+
+/**
+ * Interface for a class that generates and publishes notifications for DataTreeChangeListeners.
+ *
+ * @author Thomas Pantelis
+ */
+interface ShardDataTreeChangeListenerPublisher extends ShardDataTreeNotificationPublisher, DOMStoreTreeChangePublisher {
+    ShardDataTreeChangeListenerPublisher newInstance();
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangeListenerPublisherActorProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangeListenerPublisherActorProxy.java
new file mode 100644 (file)
index 0000000..2ac3ee8
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorContext;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Implementation of ShardDataTreeChangeListenerPublisher that offloads the generation and publication
+ * of data tree change notifications to an actor.
+ *
+ * @author Thomas Pantelis
+ */
+@NotThreadSafe
+class ShardDataTreeChangeListenerPublisherActorProxy extends AbstractShardDataTreeNotificationPublisherActorProxy
+        implements ShardDataTreeChangeListenerPublisher {
+
+    private final ShardDataTreeChangeListenerPublisher delegatePublisher = new DefaultShardDataTreeChangeListenerPublisher();
+
+    ShardDataTreeChangeListenerPublisherActorProxy(ActorContext actorContext, String actorName) {
+        super(actorContext, actorName);
+    }
+
+    private ShardDataTreeChangeListenerPublisherActorProxy(ShardDataTreeChangeListenerPublisherActorProxy other) {
+        super(other);
+    }
+
+    @Override
+    public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
+            YangInstanceIdentifier treeId, L listener) {
+        return delegatePublisher.registerTreeChangeListener(treeId, listener);
+    }
+
+    @Override
+    public ShardDataTreeChangeListenerPublisher newInstance() {
+        return new ShardDataTreeChangeListenerPublisherActorProxy(this);
+    }
+
+    @Override
+    protected ShardDataTreeNotificationPublisher getDelegatePublisher() {
+        return delegatePublisher;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationManager.java
deleted file mode 100644 (file)
index 8a54fc6..0000000
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
-import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
-import org.opendaylight.yangtools.util.concurrent.NotificationManager;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-final class ShardDataTreeNotificationManager implements NotificationManager<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> {
-    private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeNotificationManager.class);
-
-    @Override
-    public void submitNotification(final DataChangeListenerRegistration<?> listener, final DOMImmutableDataChangeEvent notification) {
-        LOG.debug("Notifying listener {} about {}", listener.getInstance(), notification);
-
-        listener.getInstance().onDataChanged(notification);
-    }
-
-    @Override
-    public void submitNotifications(final DataChangeListenerRegistration<?> listener, final Iterable<DOMImmutableDataChangeEvent> notifications) {
-        final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> instance = listener.getInstance();
-        LOG.debug("Notifying listener {} about {}", instance, notifications);
-
-        for (DOMImmutableDataChangeEvent n : notifications) {
-            instance.onDataChanged(n);
-        }
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationPublisher.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationPublisher.java
new file mode 100644 (file)
index 0000000..e51b81c
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+/**
+ * Interface for a class the publishes data tree notifications.
+ *
+ * @author Thomas Pantelis
+ */
+interface ShardDataTreeNotificationPublisher {
+    long PUBLISH_DELAY_THRESHOLD_IN_MS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.SECONDS);
+
+    void publishChanges(final DataTreeCandidate candidate, String logContext);
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationPublisherActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationPublisherActor.java
new file mode 100644 (file)
index 0000000..e4e7eb3
--- /dev/null
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Props;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+/**
+ * Actor used to generate and publish data tree notifications. This is used to offload the potentially
+ * expensive notification generation from the Shard actor.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardDataTreeNotificationPublisherActor extends AbstractUntypedActor {
+
+    @Override
+    protected void handleReceive(Object message) {
+        if(message instanceof PublishNotifications) {
+            ((PublishNotifications)message).publish();
+        }
+    }
+
+    static Props props() {
+        return Props.create(ShardDataTreeNotificationPublisherActor.class);
+    }
+
+    static class PublishNotifications {
+        private final ShardDataTreeNotificationPublisher publisher;
+        private final DataTreeCandidate candidate;
+        private final String logContext;
+
+        PublishNotifications(ShardDataTreeNotificationPublisher publisher, DataTreeCandidate candidate,
+                String logContext) {
+            this.publisher = publisher;
+            this.candidate = candidate;
+            this.logContext = logContext;
+        }
+
+        private void publish() {
+            publisher.publishChanges(candidate, logContext);
+        }
+    }
+}
index 37726d2..71e6b64 100644 (file)
@@ -83,7 +83,7 @@ public class ActorContext {
             return actualFailure;
         }
     };
-    public static final String MAILBOX = "bounded-mailbox";
+    public static final String BOUNDED_MAILBOX = "bounded-mailbox";
     public static final String COMMIT = "commit";
 
     private final ActorSystem actorSystem;