*/
package org.opendaylight.mdsal.dom.broker;
+import static com.google.common.base.Preconditions.checkArgument;
+
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableMultimap.Builder;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.PhasedBackoffWaitStrategy;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.opendaylight.mdsal.dom.api.DOMNotification;
import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.util.ListenerRegistry;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* #offerNotification(DOMNotification, long, TimeUnit)}
* is realized by arming a background wakeup interrupt.
*/
-public final class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService,
+public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService,
DOMNotificationService, DOMNotificationSubscriptionListenerRegistry {
private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class);
- private static final ListenableFuture<Void> NO_LISTENERS = Futures.immediateFuture(null);
+ private static final ListenableFuture<Void> NO_LISTENERS = FluentFutures.immediateNullFluentFuture();
private static final WaitStrategy DEFAULT_STRATEGY = PhasedBackoffWaitStrategy.withLock(
1L, 30L, TimeUnit.MILLISECONDS);
private static final EventHandler<DOMNotificationRouterEvent> DISPATCH_NOTIFICATIONS =
private static final EventHandler<DOMNotificationRouterEvent> NOTIFY_FUTURE =
(event, sequence, endOfBatch) -> event.setFuture();
- private final Disruptor<DOMNotificationRouterEvent> disruptor;
- private final ExecutorService executor;
- private volatile Multimap<SchemaPath, ListenerRegistration<? extends
- DOMNotificationListener>> listeners = ImmutableMultimap.of();
private final ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners =
ListenerRegistry.create();
+ private final Disruptor<DOMNotificationRouterEvent> disruptor;
+ private final ScheduledThreadPoolExecutor observer;
+ private final ExecutorService executor;
- @SuppressWarnings("unchecked")
- private DOMNotificationRouter(final ExecutorService executor, final int queueDepth, final WaitStrategy strategy) {
- this.executor = Preconditions.checkNotNull(executor);
+ private volatile Multimap<SchemaPath, AbstractListenerRegistration<? extends DOMNotificationListener>> listeners =
+ ImmutableMultimap.of();
- disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY,
- queueDepth, executor, ProducerType.MULTI, strategy);
+ @VisibleForTesting
+ DOMNotificationRouter(final int queueDepth, final WaitStrategy strategy) {
+ observer = new ScheduledThreadPoolExecutor(1,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DOMNotificationRouter-observer-%d").build());
+ executor = Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DOMNotificationRouter-listeners-%d").build());
+ disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY, queueDepth,
+ new ThreadFactoryBuilder().setNameFormat("DOMNotificationRouter-disruptor-%d").build(),
+ ProducerType.MULTI, strategy);
disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS);
disruptor.after(DISPATCH_NOTIFICATIONS).handleEventsWith(NOTIFY_FUTURE);
disruptor.start();
}
public static DOMNotificationRouter create(final int queueDepth) {
- final ExecutorService executor = Executors.newCachedThreadPool();
-
- return new DOMNotificationRouter(executor, queueDepth, DEFAULT_STRATEGY);
+ return new DOMNotificationRouter(queueDepth, DEFAULT_STRATEGY);
}
- public static DOMNotificationRouter create(final int queueDepth, final long spinTime,
- final long parkTime, final TimeUnit unit) {
- final ExecutorService executor = Executors.newCachedThreadPool();
- final WaitStrategy strategy = PhasedBackoffWaitStrategy.withLock(spinTime, parkTime, unit);
-
- return new DOMNotificationRouter(executor, queueDepth, strategy);
+ public static DOMNotificationRouter create(final int queueDepth, final long spinTime, final long parkTime,
+ final TimeUnit unit) {
+ checkArgument(Long.lowestOneBit(queueDepth) == Long.highestOneBit(queueDepth),
+ "Queue depth %s is not power-of-two", queueDepth);
+ return new DOMNotificationRouter(queueDepth, PhasedBackoffWaitStrategy.withLock(spinTime, parkTime, unit));
}
@Override
public synchronized <T extends DOMNotificationListener> ListenerRegistration<T> registerNotificationListener(
final T listener, final Collection<SchemaPath> types) {
- final ListenerRegistration<T> reg = new AbstractListenerRegistration<T>(listener) {
+ final AbstractListenerRegistration<T> reg = new AbstractListenerRegistration<>(listener) {
@Override
protected void removeRegistration() {
synchronized (DOMNotificationRouter.this) {
};
if (!types.isEmpty()) {
- final Builder<SchemaPath, ListenerRegistration<? extends DOMNotificationListener>> b =
+ final Builder<SchemaPath, AbstractListenerRegistration<? extends DOMNotificationListener>> b =
ImmutableMultimap.builder();
b.putAll(listeners);
* @param newListeners is used to notify listenerTypes changed
*/
private void replaceListeners(
- final Multimap<SchemaPath, ListenerRegistration<? extends DOMNotificationListener>> newListeners) {
+ final Multimap<SchemaPath, AbstractListenerRegistration<? extends DOMNotificationListener>> newListeners) {
listeners = newListeners;
notifyListenerTypesChanged(newListeners.keySet());
}
@SuppressWarnings("checkstyle:IllegalCatch")
private void notifyListenerTypesChanged(final Set<SchemaPath> typesAfter) {
- final List<ListenerRegistration<DOMNotificationSubscriptionListener>> listenersAfter =
- ImmutableList.copyOf(subscriptionListeners.getListeners());
- executor.submit(() -> {
- for (final ListenerRegistration<DOMNotificationSubscriptionListener> subListener : listenersAfter) {
+ final List<? extends DOMNotificationSubscriptionListener> listenersAfter =
+ subscriptionListeners.streamListeners().collect(ImmutableList.toImmutableList());
+ executor.execute(() -> {
+ for (final DOMNotificationSubscriptionListener subListener : listenersAfter) {
try {
- subListener.getInstance().onSubscriptionChanged(typesAfter);
+ subListener.onSubscriptionChanged(typesAfter);
} catch (final Exception e) {
- LOG.warn("Uncaught exception during invoking listener {}", subListener.getInstance(), e);
+ LOG.warn("Uncaught exception during invoking listener {}", subListener, e);
}
}
});
public <L extends DOMNotificationSubscriptionListener> ListenerRegistration<L> registerSubscriptionListener(
final L listener) {
final Set<SchemaPath> initialTypes = listeners.keySet();
- executor.submit(() -> listener.onSubscriptionChanged(initialTypes));
- return subscriptionListeners.registerWithType(listener);
+ executor.execute(() -> listener.onSubscriptionChanged(initialTypes));
+ return subscriptionListeners.register(listener);
}
private ListenableFuture<Void> publish(final long seq, final DOMNotification notification,
- final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
+ final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers) {
final DOMNotificationRouterEvent event = disruptor.get(seq);
final ListenableFuture<Void> future = event.initialize(notification, subscribers);
disruptor.getRingBuffer().publish(seq);
@Override
public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
throws InterruptedException {
- final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers =
+ final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers =
listeners.get(notification.getType());
if (subscribers.isEmpty()) {
return NO_LISTENERS;
}
@SuppressWarnings("checkstyle:IllegalCatch")
- private ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
- final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
+ @VisibleForTesting
+ ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
+ final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers) {
final long seq;
try {
seq = disruptor.getRingBuffer().tryNext();
@Override
public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification) {
- final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers =
+ final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers =
listeners.get(notification.getType());
if (subscribers.isEmpty()) {
return NO_LISTENERS;
@Override
public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification, final long timeout,
final TimeUnit unit) throws InterruptedException {
- final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers =
+ final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers =
listeners.get(notification.getType());
if (subscribers.isEmpty()) {
return NO_LISTENERS;
}
-
// Attempt to perform a non-blocking publish first
- final ListenableFuture<? extends Object> noBlock = tryPublish(notification, subscribers);
+ final ListenableFuture<?> noBlock = tryPublish(notification, subscribers);
if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) {
return noBlock;
}
- /*
- * FIXME: we need a background thread, which will watch out for blocking too long. Here
- * we will arm a tasklet for it and synchronize delivery of interrupt properly.
- */
- throw new UnsupportedOperationException("Not implemented yet");
+ try {
+ final Thread publishThread = Thread.currentThread();
+ ScheduledFuture<?> timerTask = observer.schedule(publishThread::interrupt, timeout, unit);
+ final ListenableFuture<?> withBlock = putNotification(notification);
+ timerTask.cancel(true);
+ if (observer.getQueue().size() > 50) {
+ observer.purge();
+ }
+ return withBlock;
+ } catch (InterruptedException e) {
+ return DOMNotificationPublishService.REJECTED;
+ }
}
@Override
public void close() {
disruptor.shutdown();
+ observer.shutdown();
executor.shutdown();
}
return executor;
}
+ @VisibleForTesting
+ ExecutorService observer() {
+ return observer;
+ }
+
@VisibleForTesting
Multimap<SchemaPath, ?> listeners() {
return listeners;
ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners() {
return subscriptionListeners;
}
-
}