odl-cluster-data {
bounded-mailbox {
mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
- mailbox-capacity = 1000
- mailbox-push-timeout-time = 100ms
+ mailbox-capacity = 5000
+ mailbox-push-timeout-time = 10ms
}
metric-capture-enabled = true
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base class for a ShardDataTreeNotificationPublisher that offloads the generation and publication
+ * of data tree notifications to an actor.
+ *
+ * @author Thomas Pantelis
+ */
+@NotThreadSafe
+abstract class AbstractShardDataTreeNotificationPublisherActorProxy implements ShardDataTreeNotificationPublisher {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractShardDataTreeNotificationPublisherActorProxy.class);
+
+ private final ActorContext actorContext;
+ private final String actorName;
+ private ActorRef notifierActor;
+
+ protected AbstractShardDataTreeNotificationPublisherActorProxy(ActorContext actorContext, String actorName) {
+ this.actorContext = actorContext;
+ this.actorName = actorName;
+ }
+
+ protected AbstractShardDataTreeNotificationPublisherActorProxy(
+ AbstractShardDataTreeNotificationPublisherActorProxy other) {
+ this.actorContext = null;
+ this.actorName = null;
+ this.notifierActor = other.getNotifierActor();
+ }
+
+ protected abstract ShardDataTreeNotificationPublisher getDelegatePublisher();
+
+ @Override
+ public void publishChanges(DataTreeCandidate candidate, String logContext) {
+ getNotifierActor().tell(new ShardDataTreeNotificationPublisherActor.PublishNotifications(
+ getDelegatePublisher(), candidate, logContext), ActorRef.noSender());
+ }
+
+ private ActorRef getNotifierActor() {
+ if(notifierActor == null) {
+ LOG.debug("Creating actor {}", actorName);
+
+ String dispatcher = new Dispatchers(actorContext.system().dispatchers()).getDispatcherPath(
+ Dispatchers.DispatcherType.Notification);
+ notifierActor = actorContext.actorOf(ShardDataTreeNotificationPublisherActor.props()
+ .withDispatcher(dispatcher).withMailbox(
+ org.opendaylight.controller.cluster.datastore.utils.ActorContext.BOUNDED_MAILBOX), actorName);
+ }
+
+ return notifierActor;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Stopwatch;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
+import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
+import org.opendaylight.controller.md.sal.dom.store.impl.ResolveDataChangeEventsTask;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
+import org.opendaylight.yangtools.util.concurrent.NotificationManager;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Default implementation of ShardDataChangeListenerPublisher that directly generates and publishes
+ * notifications for DataChangeListeners.
+ *
+ * @author Thomas Pantelis
+ */
+@NotThreadSafe
+final class DefaultShardDataChangeListenerPublisher implements ShardDataChangeListenerPublisher,
+ NotificationManager<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultShardDataChangeListenerPublisher.class);
+
+ private final ListenerTree dataChangeListenerTree = ListenerTree.create();
+ private final Stopwatch timer = Stopwatch.createUnstarted();
+
+ @Override
+ public void submitNotification(final DataChangeListenerRegistration<?> listener, final DOMImmutableDataChangeEvent notification) {
+ LOG.debug("Notifying listener {} about {}", listener.getInstance(), notification);
+
+ listener.getInstance().onDataChanged(notification);
+ }
+
+ @Override
+ public void submitNotifications(final DataChangeListenerRegistration<?> listener, final Iterable<DOMImmutableDataChangeEvent> notifications) {
+ final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> instance = listener.getInstance();
+ LOG.debug("Notifying listener {} about {}", instance, notifications);
+
+ for (DOMImmutableDataChangeEvent n : notifications) {
+ instance.onDataChanged(n);
+ }
+ }
+
+ @Override
+ public void publishChanges(DataTreeCandidate candidate, String logContext) {
+ timer.start();
+
+ try {
+ ResolveDataChangeEventsTask.create(candidate, dataChangeListenerTree).resolve(this);
+ } finally {
+ timer.stop();
+ long elapsedTime = timer.elapsed(TimeUnit.MILLISECONDS);
+ if(elapsedTime >= PUBLISH_DELAY_THRESHOLD_IN_MS) {
+ LOG.warn("{}: Generation of DataChange events took longer than expected. Elapsed time: {}",
+ logContext, timer);
+ } else {
+ LOG.debug("{}: Elapsed time for generation of DataChange events: {}", logContext, timer);
+ }
+
+ timer.reset();
+ }
+ }
+
+ @Override
+ public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> DataChangeListenerRegistration<L>
+ registerDataChangeListener(YangInstanceIdentifier path, L listener, DataChangeScope scope) {
+ return dataChangeListenerTree.registerDataChangeListener(path, listener, scope);
+ }
+
+ @Override
+ public ShardDataChangeListenerPublisher newInstance() {
+ return new DefaultShardDataChangeListenerPublisher();
+ }
+}
*/
package org.opendaylight.controller.cluster.datastore;
+import com.google.common.base.Stopwatch;
import java.util.Collection;
import java.util.Collections;
+import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTreeChangePublisher;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Default implementation of ShardDataTreeChangeListenerPublisher that directly generates and publishes
+ * notifications for DataTreeChangeListeners.
+ *
+ * @author Thomas Pantelis
+ */
@NotThreadSafe
-final class ShardDataTreeChangePublisher extends AbstractDOMStoreTreeChangePublisher {
- private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeChangePublisher.class);
+final class DefaultShardDataTreeChangeListenerPublisher extends AbstractDOMStoreTreeChangePublisher
+ implements ShardDataTreeChangeListenerPublisher {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultShardDataTreeChangeListenerPublisher.class);
+
+ private final Stopwatch timer = Stopwatch.createUnstarted();
+
+ @Override
+ public void publishChanges(final DataTreeCandidate candidate, String logContext) {
+ timer.start();
+
+ try {
+ processCandidateTree(candidate);
+ } finally {
+ timer.stop();
+ long elapsedTime = timer.elapsed(TimeUnit.MILLISECONDS);
+ if(elapsedTime >= PUBLISH_DELAY_THRESHOLD_IN_MS) {
+ LOG.warn("{}: Generation of DataTreeCandidateNode events took longer than expected. Elapsed time: {}",
+ logContext, timer);
+ } else {
+ LOG.debug("{}: Elapsed time for generation of DataTreeCandidateNode events: {}", logContext, timer);
+ }
+
+ timer.reset();
+ }
+ }
- void publishChanges(final DataTreeCandidate candidate) {
- processCandidateTree(candidate);
+ @Override
+ public ShardDataTreeChangeListenerPublisher newInstance() {
+ return new DefaultShardDataTreeChangeListenerPublisher();
}
@Override
for(int i=0;i<100;i++) {
try {
return actorSystem.actorOf(builder.props().withDispatcher(shardDispatcher).withMailbox(
- ActorContext.MAILBOX), shardManagerId);
+ ActorContext.BOUNDED_MAILBOX), shardManagerId);
} catch (Exception e){
lastException = e;
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
- store = new ShardDataTree(builder.getSchemaContext(), builder.getTreeType());
+ store = new ShardDataTree(builder.getSchemaContext(), builder.getTreeType(),
+ new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher"),
+ new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher"), name);
shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
datastoreContext.getDataStoreMXBeanType());
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * Interface for a class that generates and publishes notifications for DataChangeListeners.
+ *
+ * @author Thomas Pantelis
+ */
+interface ShardDataChangeListenerPublisher extends ShardDataTreeNotificationPublisher {
+ ShardDataChangeListenerPublisher newInstance();
+
+ <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> DataChangeListenerRegistration<L>
+ registerDataChangeListener(final YangInstanceIdentifier path,final L listener, final DataChangeScope scope);
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorContext;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * Implementation of ShardDataChangeListenerPublisher that offloads the generation and publication
+ * of data change notifications to an actor.
+ *
+ * @author Thomas Pantelis
+ */
+@NotThreadSafe
+class ShardDataChangeListenerPublisherActorProxy extends AbstractShardDataTreeNotificationPublisherActorProxy
+ implements ShardDataChangeListenerPublisher {
+
+ private final ShardDataChangeListenerPublisher delegatePublisher = new DefaultShardDataChangeListenerPublisher();
+
+ ShardDataChangeListenerPublisherActorProxy(ActorContext actorContext, String actorName) {
+ super(actorContext, actorName);
+ }
+
+ private ShardDataChangeListenerPublisherActorProxy(ShardDataChangeListenerPublisherActorProxy other) {
+ super(other);
+ }
+
+ @Override
+ public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> DataChangeListenerRegistration<L> registerDataChangeListener(
+ YangInstanceIdentifier path, L listener, DataChangeScope scope) {
+ return delegatePublisher.registerDataChangeListener(path, listener, scope);
+ }
+
+ @Override
+ public ShardDataChangeListenerPublisher newInstance() {
+ return new ShardDataChangeListenerPublisherActorProxy(this);
+ }
+
+ @Override
+ protected ShardDataTreeNotificationPublisher getDelegatePublisher() {
+ return delegatePublisher;
+ }
+}
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
-import org.opendaylight.controller.md.sal.dom.store.impl.ResolveDataChangeEventsTask;
-import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
public class ShardDataTree extends ShardDataTreeTransactionParent {
private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
private static final YangInstanceIdentifier ROOT_PATH = YangInstanceIdentifier.builder().build();
- private static final ShardDataTreeNotificationManager MANAGER = new ShardDataTreeNotificationManager();
+
private final Map<String, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
- private final ShardDataTreeChangePublisher treeChangePublisher = new ShardDataTreeChangePublisher();
- private final ListenerTree listenerTree = ListenerTree.create();
+ private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher;
+ private final ShardDataChangeListenerPublisher dataChangeListenerPublisher;
private final TipProducingDataTree dataTree;
+ private final String logContext;
private SchemaContext schemaContext;
- public ShardDataTree(final SchemaContext schemaContext, final TreeType treeType) {
+ public ShardDataTree(final SchemaContext schemaContext, final TreeType treeType,
+ final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
+ final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext) {
dataTree = InMemoryDataTreeFactory.getInstance().create(treeType);
updateSchemaContext(schemaContext);
+ this.treeChangeListenerPublisher = treeChangeListenerPublisher;
+ this.dataChangeListenerPublisher = dataChangeListenerPublisher;
+ this.logContext = logContext;
+ }
+
+ public ShardDataTree(final SchemaContext schemaContext, final TreeType treeType) {
+ this(schemaContext, treeType, new DefaultShardDataTreeChangeListenerPublisher(),
+ new DefaultShardDataChangeListenerPublisher(), "");
}
public TipProducingDataTree getDataTree() {
}
public void notifyListeners(final DataTreeCandidate candidate) {
- LOG.debug("Notifying listeners on candidate {}", candidate);
-
- // DataTreeChanges first, as they are more light-weight
- treeChangePublisher.publishChanges(candidate);
-
- // DataChanges second, as they are heavier
- ResolveDataChangeEventsTask.create(candidate, listenerTree).resolve(MANAGER);
+ treeChangeListenerPublisher.publishChanges(candidate, logContext);
+ dataChangeListenerPublisher.publishChanges(candidate, logContext);
}
void notifyOfInitialData(DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
NormalizedNode<?, ?>>> listenerReg, Optional<DataTreeCandidate> currentState) {
if(currentState.isPresent()) {
- ListenerTree localListenerTree = ListenerTree.create();
- localListenerTree.registerDataChangeListener(listenerReg.getPath(), listenerReg.getInstance(),
+ ShardDataChangeListenerPublisher localPublisher = dataChangeListenerPublisher.newInstance();
+ localPublisher.registerDataChangeListener(listenerReg.getPath(), listenerReg.getInstance(),
listenerReg.getScope());
-
- ResolveDataChangeEventsTask.create(currentState.get(), localListenerTree).resolve(MANAGER);
+ localPublisher.publishChanges(currentState.get(), logContext);
}
}
void notifyOfInitialData(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
final Optional<DataTreeCandidate> currentState) {
if(currentState.isPresent()) {
- ShardDataTreeChangePublisher localTreeChangePublisher = new ShardDataTreeChangePublisher();
- localTreeChangePublisher.registerTreeChangeListener(path, listener);
- localTreeChangePublisher.publishChanges(currentState.get());
+ ShardDataTreeChangeListenerPublisher localPublisher = treeChangeListenerPublisher.newInstance();
+ localPublisher.registerTreeChangeListener(path, listener);
+ localPublisher.publishChanges(currentState.get(), logContext);
}
}
if (chain != null) {
chain.close();
} else {
- LOG.debug("Closing non-existent transaction chain {}", transactionChainId);
+ LOG.debug("{}: Closing non-existent transaction chain {}", logContext, transactionChainId);
}
}
final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
final DataChangeScope scope) {
final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
- listenerTree.registerDataChangeListener(path, listener, scope);
+ dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope);
return new SimpleEntry<>(reg, readCurrentData());
}
public Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> registerTreeChangeListener(
final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) {
- final ListenerRegistration<DOMDataTreeChangeListener> reg = treeChangePublisher.registerTreeChangeListener(
+ final ListenerRegistration<DOMDataTreeChangeListener> reg = treeChangeListenerPublisher.registerTreeChangeListener(
path, listener);
return new SimpleEntry<>(reg, readCurrentData());
}
void applyForeignCandidate(final String identifier, final DataTreeCandidate foreign) throws DataValidationFailedException {
- LOG.debug("Applying foreign transaction {}", identifier);
+ LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
final DataTreeModification mod = dataTree.takeSnapshot().newModification();
DataTreeCandidates.applyToModification(mod, foreign);
mod.ready();
- LOG.trace("Applying foreign modification {}", mod);
+ LOG.trace("{}: Applying foreign modification {}", logContext, mod);
dataTree.validate(mod);
final DataTreeCandidate candidate = dataTree.prepare(mod);
dataTree.commit(candidate);
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
+
+/**
+ * Interface for a class that generates and publishes notifications for DataTreeChangeListeners.
+ *
+ * @author Thomas Pantelis
+ */
+interface ShardDataTreeChangeListenerPublisher extends ShardDataTreeNotificationPublisher, DOMStoreTreeChangePublisher {
+ ShardDataTreeChangeListenerPublisher newInstance();
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorContext;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Implementation of ShardDataTreeChangeListenerPublisher that offloads the generation and publication
+ * of data tree change notifications to an actor.
+ *
+ * @author Thomas Pantelis
+ */
+@NotThreadSafe
+class ShardDataTreeChangeListenerPublisherActorProxy extends AbstractShardDataTreeNotificationPublisherActorProxy
+ implements ShardDataTreeChangeListenerPublisher {
+
+ private final ShardDataTreeChangeListenerPublisher delegatePublisher = new DefaultShardDataTreeChangeListenerPublisher();
+
+ ShardDataTreeChangeListenerPublisherActorProxy(ActorContext actorContext, String actorName) {
+ super(actorContext, actorName);
+ }
+
+ private ShardDataTreeChangeListenerPublisherActorProxy(ShardDataTreeChangeListenerPublisherActorProxy other) {
+ super(other);
+ }
+
+ @Override
+ public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
+ YangInstanceIdentifier treeId, L listener) {
+ return delegatePublisher.registerTreeChangeListener(treeId, listener);
+ }
+
+ @Override
+ public ShardDataTreeChangeListenerPublisher newInstance() {
+ return new ShardDataTreeChangeListenerPublisherActorProxy(this);
+ }
+
+ @Override
+ protected ShardDataTreeNotificationPublisher getDelegatePublisher() {
+ return delegatePublisher;
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
-import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
-import org.opendaylight.yangtools.util.concurrent.NotificationManager;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-final class ShardDataTreeNotificationManager implements NotificationManager<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> {
- private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeNotificationManager.class);
-
- @Override
- public void submitNotification(final DataChangeListenerRegistration<?> listener, final DOMImmutableDataChangeEvent notification) {
- LOG.debug("Notifying listener {} about {}", listener.getInstance(), notification);
-
- listener.getInstance().onDataChanged(notification);
- }
-
- @Override
- public void submitNotifications(final DataChangeListenerRegistration<?> listener, final Iterable<DOMImmutableDataChangeEvent> notifications) {
- final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> instance = listener.getInstance();
- LOG.debug("Notifying listener {} about {}", instance, notifications);
-
- for (DOMImmutableDataChangeEvent n : notifications) {
- instance.onDataChanged(n);
- }
- }
-}
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+/**
+ * Interface for a class the publishes data tree notifications.
+ *
+ * @author Thomas Pantelis
+ */
+interface ShardDataTreeNotificationPublisher {
+ long PUBLISH_DELAY_THRESHOLD_IN_MS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.SECONDS);
+
+ void publishChanges(final DataTreeCandidate candidate, String logContext);
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Props;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+/**
+ * Actor used to generate and publish data tree notifications. This is used to offload the potentially
+ * expensive notification generation from the Shard actor.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardDataTreeNotificationPublisherActor extends AbstractUntypedActor {
+
+ @Override
+ protected void handleReceive(Object message) {
+ if(message instanceof PublishNotifications) {
+ ((PublishNotifications)message).publish();
+ }
+ }
+
+ static Props props() {
+ return Props.create(ShardDataTreeNotificationPublisherActor.class);
+ }
+
+ static class PublishNotifications {
+ private final ShardDataTreeNotificationPublisher publisher;
+ private final DataTreeCandidate candidate;
+ private final String logContext;
+
+ PublishNotifications(ShardDataTreeNotificationPublisher publisher, DataTreeCandidate candidate,
+ String logContext) {
+ this.publisher = publisher;
+ this.candidate = candidate;
+ this.logContext = logContext;
+ }
+
+ private void publish() {
+ publisher.publishChanges(candidate, logContext);
+ }
+ }
+}
return actualFailure;
}
};
- public static final String MAILBOX = "bounded-mailbox";
+ public static final String BOUNDED_MAILBOX = "bounded-mailbox";
public static final String COMMIT = "commit";
private final ActorSystem actorSystem;