private volatile Multimap<SchemaPath, ListenerRegistration<? extends DOMNotificationListener>> listeners = ImmutableMultimap.of();
private final ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners = ListenerRegistry.create();
- private DOMNotificationRouter(final ExecutorService executor, final Disruptor<DOMNotificationRouterEvent> disruptor) {
+ @SuppressWarnings("unchecked")
+ private DOMNotificationRouter(final ExecutorService executor, final int queueDepth, final WaitStrategy strategy) {
this.executor = Preconditions.checkNotNull(executor);
- this.disruptor = Preconditions.checkNotNull(disruptor);
+
+ disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY, queueDepth, executor, ProducerType.MULTI, strategy);
+ disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS);
+ disruptor.after(DISPATCH_NOTIFICATIONS).handleEventsWith(NOTIFY_FUTURE);
+ disruptor.start();
}
- @SuppressWarnings("unchecked")
public static DOMNotificationRouter create(final int queueDepth) {
final ExecutorService executor = Executors.newCachedThreadPool();
- final Disruptor<DOMNotificationRouterEvent> disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY, queueDepth, executor, ProducerType.MULTI, DEFAULT_STRATEGY);
- disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS);
- disruptor.after(DISPATCH_NOTIFICATIONS).handleEventsWith(NOTIFY_FUTURE);
- disruptor.start();
+ return new DOMNotificationRouter(executor, queueDepth, DEFAULT_STRATEGY);
+ }
+
+ public static DOMNotificationRouter create(final int queueDepth, final long spinTime, final long parkTime, final TimeUnit unit) {
+ final ExecutorService executor = Executors.newCachedThreadPool();
+ final WaitStrategy strategy = PhasedBackoffWaitStrategy.withLock(spinTime, parkTime, unit);
- return new DOMNotificationRouter(executor, disruptor);
+ return new DOMNotificationRouter(executor, queueDepth, strategy);
}
@Override