+ private final class PublishFacade implements DOMNotificationPublishService, DOMNotificationPublishDemandExtension {
+ @Override
+ public List<Extension> supportedExtensions() {
+ return List.of(this);
+ }
+
+ @Override
+ public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
+ throws InterruptedException {
+ return putNotificationImpl(notification);
+ }
+
+ @Override
+ public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification) {
+ final var subscribers = listeners.get(notification.getType());
+ return subscribers.isEmpty() ? NO_LISTENERS : publish(notification, subscribers);
+ }
+
+ @Override
+ public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification,
+ final long timeout, final TimeUnit unit) throws InterruptedException {
+ final var subscribers = listeners.get(notification.getType());
+ if (subscribers.isEmpty()) {
+ return NO_LISTENERS;
+ }
+ // Attempt to perform a non-blocking publish first
+ final var noBlock = publish(notification, subscribers);
+ if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) {
+ return noBlock;
+ }
+
+ try {
+ final var publishThread = Thread.currentThread();
+ final var timerTask = observer.schedule(publishThread::interrupt, timeout, unit);
+ final var withBlock = putNotificationImpl(notification);
+ timerTask.cancel(true);
+ if (observer.getQueue().size() > 50) {
+ observer.purge();
+ }
+ return withBlock;
+ } catch (InterruptedException e) {
+ return DOMNotificationPublishService.REJECTED;
+ }
+ }
+
+ @Override
+ public Registration registerDemandListener(final DemandListener listener) {
+ final var initialTypes = listeners.keySet();
+ executor.execute(() -> listener.onDemandUpdated(initialTypes));
+ return demandListeners.register(listener);
+ }
+ }
+
+ private final class SubscribeFacade implements DOMNotificationService {
+ @Override
+ public Registration registerNotificationListener(final DOMNotificationListener listener,
+ final Collection<Absolute> types) {
+ synchronized (DOMNotificationRouter.this) {
+ final var reg = new SingleReg(listener);
+
+ if (!types.isEmpty()) {
+ final var b = ImmutableMultimap.<Absolute, Reg>builder();
+ b.putAll(listeners);
+
+ for (var t : types) {
+ b.put(t, reg);
+ }
+
+ replaceListeners(b.build());
+ }
+
+ return reg;
+ }
+ }
+
+ @Override
+ public synchronized Registration registerNotificationListeners(
+ final Map<Absolute, DOMNotificationListener> typeToListener) {
+ synchronized (DOMNotificationRouter.this) {
+ final var b = ImmutableMultimap.<Absolute, Reg>builder();
+ b.putAll(listeners);
+
+ final var tmp = new HashMap<DOMNotificationListener, ComponentReg>();
+ for (var e : typeToListener.entrySet()) {
+ b.put(e.getKey(), tmp.computeIfAbsent(e.getValue(), ComponentReg::new));
+ }
+ replaceListeners(b.build());
+
+ final var regs = List.copyOf(tmp.values());
+ return new AbstractRegistration() {
+ @Override
+ protected void removeRegistration() {
+ regs.forEach(ComponentReg::close);
+ removeRegistrations(regs);
+ }
+ };
+ }
+ }
+ }
+