import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
+
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
+import org.opendaylight.yangtools.util.ExecutorServiceUtil;
+import org.opendaylight.yangtools.util.PropertyUtils;
+import org.opendaylight.yangtools.util.concurrent.NotificationManager;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.GuardedBy;
+
import java.util.Collections;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static com.google.common.base.Preconditions.checkState;
public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener,
TransactionReadyPrototype,AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
+
+ @SuppressWarnings("rawtypes")
+ private static final QueuedNotificationManager.Invoker<AsyncDataChangeListener,
+ AsyncDataChangeEvent> DCL_NOTIFICATION_MGR_INVOKER =
+ new QueuedNotificationManager.Invoker<AsyncDataChangeListener,
+ AsyncDataChangeEvent>() {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void invokeListener( AsyncDataChangeListener listener,
+ AsyncDataChangeEvent notification ) {
+ listener.onDataChanged(notification);
+ }
+ };
+
+ private static final String DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE_PROP =
+ "mdsal.datastore-dcl-notification-queue.size";
+
+ private static final int DEFAULT_DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE = 1000;
+
private final DataTree dataTree = InMemoryDataTreeFactory.getInstance().create();
private final ListenerTree listenerTree = ListenerTree.create();
private final AtomicLong txCounter = new AtomicLong(0);
- private final ListeningExecutorService executor;
+ private final ListeningExecutorService listeningExecutor;
+
+ @SuppressWarnings("rawtypes")
+ private final NotificationManager<AsyncDataChangeListener,AsyncDataChangeEvent>
+ dataChangeListenerNotificationManager;
+ private final ExecutorService dataChangeListenerExecutor;
private final String name;
- public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
+ public InMemoryDOMDataStore(final String name, final ListeningExecutorService listeningExecutor,
+ final ExecutorService dataChangeListenerExecutor) {
this.name = Preconditions.checkNotNull(name);
- this.executor = Preconditions.checkNotNull(executor);
+ this.listeningExecutor = Preconditions.checkNotNull(listeningExecutor);
+
+ this.dataChangeListenerExecutor = Preconditions.checkNotNull(dataChangeListenerExecutor);
+
+ int maxDCLQueueSize = PropertyUtils.getIntSystemProperty(
+ DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE_PROP, DEFAULT_DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE );
+
+ dataChangeListenerNotificationManager =
+ new QueuedNotificationManager<>(this.dataChangeListenerExecutor,
+ DCL_NOTIFICATION_MGR_INVOKER, maxDCLQueueSize, "DataChangeListenerQueueMgr");
}
@Override
}
@Override
- public void close(){
- executor.shutdownNow();
+ public void close() {
+ ExecutorServiceUtil.tryGracefulShutdown(listeningExecutor, 30, TimeUnit.SECONDS);
+ ExecutorServiceUtil.tryGracefulShutdown(dataChangeListenerExecutor, 30, TimeUnit.SECONDS);
}
@Override
public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
.setAfter(data) //
.addCreated(path, data) //
.build();
- executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event));
+
+ new ChangeListenerNotifyTask(Collections.singletonList(reg), event,
+ dataChangeListenerNotificationManager).run();
}
}
@Override
public void close() {
- executor.shutdownNow();
-
+ // FIXME: this call doesn't look right here - listeningExecutor is shared and owned
+ // by the outer class.
+ //listeningExecutor.shutdownNow();
}
protected synchronized void onTransactionFailed(final SnapshotBackedWriteTransaction transaction,
@Override
public ListenableFuture<Boolean> canCommit() {
- return executor.submit(new Callable<Boolean>() {
+ return listeningExecutor.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws TransactionCommitFailedException {
try {
@Override
public ListenableFuture<Void> preCommit() {
- return executor.submit(new Callable<Void>() {
+ return listeningExecutor.submit(new Callable<Void>() {
@Override
public Void call() {
candidate = dataTree.prepare(modification);
- listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree);
+ listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree,
+ dataChangeListenerNotificationManager);
return null;
}
});
for (ChangeListenerNotifyTask task : listenerResolver.call()) {
LOG.trace("Scheduling invocation of listeners: {}", task);
- executor.submit(task);
+ task.run();
}
}