import com.google.common.collect.Multimaps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.PhasedBackoffWaitStrategy;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.opendaylight.mdsal.dom.api.DOMNotification;
import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* Joint implementation of {@link DOMNotificationPublishService} and {@link DOMNotificationService}. Provides
* routing of notifications from publishers to subscribers.
* #offerNotification(DOMNotification, long, TimeUnit)}
* is realized by arming a background wakeup interrupt.
*/
-public final class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService,
+public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService,
DOMNotificationService, DOMNotificationSubscriptionListenerRegistry {
private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class);
DOMNotificationListener>> listeners = ImmutableMultimap.of();
private final ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners =
ListenerRegistry.create();
+ private final ScheduledThreadPoolExecutor observer;
@SuppressWarnings("unchecked")
- private DOMNotificationRouter(final ExecutorService executor, final int queueDepth, final WaitStrategy strategy) {
+ @VisibleForTesting
+ DOMNotificationRouter(final ExecutorService executor, final int queueDepth, final WaitStrategy strategy) {
this.executor = Preconditions.checkNotNull(executor);
-
+ this.observer = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
+ .setDaemon(true).setNameFormat("DOMNotificationRouter-%d").build());
disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY,
queueDepth, executor, ProducerType.MULTI, strategy);
disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS);
}
@SuppressWarnings("checkstyle:IllegalCatch")
- private ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
+ @VisibleForTesting
+ ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
final long seq;
try {
if (subscribers.isEmpty()) {
return NO_LISTENERS;
}
-
// Attempt to perform a non-blocking publish first
- final ListenableFuture<? extends Object> noBlock = tryPublish(notification, subscribers);
+ final ListenableFuture<?> noBlock = tryPublish(notification, subscribers);
if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) {
return noBlock;
}
- /*
- * FIXME: we need a background thread, which will watch out for blocking too long. Here
- * we will arm a tasklet for it and synchronize delivery of interrupt properly.
- */
- throw new UnsupportedOperationException("Not implemented yet");
+ try {
+ final Thread publishThread = Thread.currentThread();
+ ScheduledFuture<?> timerTask = observer.schedule(publishThread::interrupt, timeout, unit);
+ final ListenableFuture<?> withBlock = putNotification(notification);
+ timerTask.cancel(true);
+ if (observer.getQueue().size() > 50) {
+ observer.purge();
+ }
+ return withBlock;
+ } catch (InterruptedException e) {
+ return DOMNotificationPublishService.REJECTED;
+ }
}
@Override
public void close() {
+ observer.shutdown();
disruptor.shutdown();
executor.shutdown();
}