import com.google.common.collect.Multimaps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.PhasedBackoffWaitStrategy;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.opendaylight.mdsal.dom.api.DOMNotification;
import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
* #offerNotification(DOMNotification, long, TimeUnit)}
* is realized by arming a background wakeup interrupt.
*/
-public final class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService,
+public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService,
DOMNotificationService, DOMNotificationSubscriptionListenerRegistry {
private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class);
DOMNotificationListener>> listeners = ImmutableMultimap.of();
private final ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners =
ListenerRegistry.create();
+ private final ScheduledThreadPoolExecutor observer;
- @SuppressWarnings("unchecked")
- private DOMNotificationRouter(final ExecutorService executor, final int queueDepth, final WaitStrategy strategy) {
+ @VisibleForTesting
+ DOMNotificationRouter(final ExecutorService executor, final int queueDepth, final WaitStrategy strategy) {
this.executor = Preconditions.checkNotNull(executor);
-
+ this.observer = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
+ .setDaemon(true).setNameFormat("DOMNotificationRouter-%d").build());
disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY,
queueDepth, executor, ProducerType.MULTI, strategy);
disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS);
public static DOMNotificationRouter create(final int queueDepth, final long spinTime,
final long parkTime, final TimeUnit unit) {
+ Preconditions.checkArgument(Long.lowestOneBit(queueDepth) == Long.highestOneBit(queueDepth),
+ "Queue depth %s is not power-of-two", queueDepth);
final ExecutorService executor = Executors.newCachedThreadPool();
final WaitStrategy strategy = PhasedBackoffWaitStrategy.withLock(spinTime, parkTime, unit);
private void notifyListenerTypesChanged(final Set<SchemaPath> typesAfter) {
final List<ListenerRegistration<DOMNotificationSubscriptionListener>> listenersAfter =
ImmutableList.copyOf(subscriptionListeners.getListeners());
- executor.submit(() -> {
+ executor.execute(() -> {
for (final ListenerRegistration<DOMNotificationSubscriptionListener> subListener : listenersAfter) {
try {
subListener.getInstance().onSubscriptionChanged(typesAfter);
public <L extends DOMNotificationSubscriptionListener> ListenerRegistration<L> registerSubscriptionListener(
final L listener) {
final Set<SchemaPath> initialTypes = listeners.keySet();
- executor.submit(() -> listener.onSubscriptionChanged(initialTypes));
+ executor.execute(() -> listener.onSubscriptionChanged(initialTypes));
return subscriptionListeners.registerWithType(listener);
}
}
@SuppressWarnings("checkstyle:IllegalCatch")
- private ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
+ @VisibleForTesting
+ ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
final long seq;
try {
if (subscribers.isEmpty()) {
return NO_LISTENERS;
}
-
// Attempt to perform a non-blocking publish first
- final ListenableFuture<? extends Object> noBlock = tryPublish(notification, subscribers);
+ final ListenableFuture<?> noBlock = tryPublish(notification, subscribers);
if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) {
return noBlock;
}
- /*
- * FIXME: we need a background thread, which will watch out for blocking too long. Here
- * we will arm a tasklet for it and synchronize delivery of interrupt properly.
- */
- throw new UnsupportedOperationException("Not implemented yet");
+ try {
+ final Thread publishThread = Thread.currentThread();
+ ScheduledFuture<?> timerTask = observer.schedule(publishThread::interrupt, timeout, unit);
+ final ListenableFuture<?> withBlock = putNotification(notification);
+ timerTask.cancel(true);
+ if (observer.getQueue().size() > 50) {
+ observer.purge();
+ }
+ return withBlock;
+ } catch (InterruptedException e) {
+ return DOMNotificationPublishService.REJECTED;
+ }
}
@Override
public void close() {
+ observer.shutdown();
disruptor.shutdown();
executor.shutdown();
}