X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-binding-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fbinding%2Fimpl%2FNotificationBrokerImpl.java;h=58e46ceca3d14d63bb2dbd6a45feec2417280f21;hb=c74d5c2399e500fe3e690edc8cee497b1cb6f867;hp=9efd48eabdb9f6889bc45551f38247f37cf9d048;hpb=c59d59a9ef1fc10e09a85044bb1bd208ff3219f2;p=controller.git diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java index 9efd48eabd..58e46ceca3 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java @@ -7,126 +7,82 @@ */ package org.opendaylight.controller.sal.binding.impl; -import java.util.Collection; -import java.util.Collections; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; + +import javax.annotation.concurrent.GuardedBy; -import org.eclipse.xtext.xbase.lib.Conversions; -import org.eclipse.xtext.xbase.lib.Functions.Function1; -import org.eclipse.xtext.xbase.lib.IterableExtensions; import org.opendaylight.controller.sal.binding.api.NotificationListener; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder; import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker; +import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.concepts.Registration; -import org.opendaylight.yangtools.concepts.util.ListenerRegistry; +import org.opendaylight.yangtools.util.ListenerRegistry; import org.opendaylight.yangtools.yang.binding.Notification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.ImmutableSet.Builder; -import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; -import com.google.common.collect.Multimaps; -import com.google.common.collect.SetMultimap; public class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(NotificationBrokerImpl.class); private final ListenerRegistry interestListeners = ListenerRegistry.create(); + private final AtomicReference listeners = new AtomicReference<>(new ListenerMapGeneration()); + private final ExecutorService executor; - private final Multimap,NotificationListener> listeners; - private ExecutorService executor; - - public NotificationBrokerImpl() { - HashMultimap,NotificationListener> _create = HashMultimap., NotificationListener>create(); - SetMultimap,NotificationListener> _synchronizedSetMultimap = Multimaps., NotificationListener>synchronizedSetMultimap(_create); - this.listeners = _synchronizedSetMultimap; - } - - @Deprecated public NotificationBrokerImpl(final ExecutorService executor) { - HashMultimap,NotificationListener> _create = HashMultimap., NotificationListener>create(); - SetMultimap,NotificationListener> _synchronizedSetMultimap = Multimaps., NotificationListener>synchronizedSetMultimap(_create); - this.listeners = _synchronizedSetMultimap; - this.setExecutor(executor); - } - - public void setExecutor(final ExecutorService executor) { this.executor = Preconditions.checkNotNull(executor); } - public Iterable> getNotificationTypes(final Notification notification) { - Class _class = notification.getClass(); - Class[] _interfaces = _class.getInterfaces(); - final Function1,Boolean> _function = new Function1,Boolean>() { - @Override - public Boolean apply(final Class it) { - boolean _and = false; - boolean _notEquals = (!Objects.equal(it, Notification.class)); - if (!_notEquals) { - _and = false; - } else { - boolean _isAssignableFrom = Notification.class.isAssignableFrom(it); - _and = (_notEquals && _isAssignableFrom); - } - return Boolean.valueOf(_and); - } - }; - Iterable> _filter = IterableExtensions.>filter(((Iterable>)Conversions.doWrapArray(_interfaces)), _function); - return _filter; - } - @Override public void publish(final Notification notification) { - this.publish(notification, executor); + publish(notification, executor); } @Override public void publish(final Notification notification, final ExecutorService service) { - final Iterable> allTypes = this.getNotificationTypes(notification); - Iterable> listenerToNotify = Collections.>emptySet(); - for (final Class type : allTypes) { - Collection> _get = this.listeners.get(((Class) type)); - Iterable> _plus = Iterables.>concat(listenerToNotify, _get); - listenerToNotify = _plus; + for (NotificationListenerRegistration r : listeners.get().listenersFor(notification)) { + service.submit(new NotifyTask(r, notification)); } - final Function1,NotifyTask> _function = new Function1,NotifyTask>() { - @Override - public NotifyTask apply(final NotificationListener it) { - NotifyTask _notifyTask = new NotifyTask(it, notification); - return _notifyTask; - } - }; - Iterable _map = IterableExtensions., NotifyTask>map(listenerToNotify, _function); - final Set tasks = IterableExtensions.toSet(_map); - this.submitAll(executor, tasks); } - public ImmutableSet> submitAll(final ExecutorService service, final Set tasks) { - final Builder> ret = ImmutableSet.>builder(); - for (final NotifyTask task : tasks) { - Future _submit = service.submit(task); - ret.add(_submit); + @GuardedBy("this") + private Multimap, NotificationListenerRegistration> mutableListeners() { + return HashMultimap.create(listeners.get().getListeners()); + } + + private final void addRegistrations(final NotificationListenerRegistration... registrations) { + synchronized (this) { + final Multimap, NotificationListenerRegistration> newListeners = + mutableListeners(); + for (NotificationListenerRegistration reg : registrations) { + newListeners.put(reg.getType(), reg); + } + + listeners.set(new ListenerMapGeneration(newListeners)); + } + + // Notifications are dispatched out of lock... + for (NotificationListenerRegistration reg : registrations) { + announceNotificationSubscription(reg.getType()); } - return ret.build(); } - @Override - public Registration> registerNotificationListener(final Class notificationType, final NotificationListener listener) { - GenericNotificationRegistration _genericNotificationRegistration = new GenericNotificationRegistration(notificationType, listener, this); - final GenericNotificationRegistration reg = _genericNotificationRegistration; - this.listeners.put(notificationType, listener); - this.announceNotificationSubscription(notificationType); - return reg; + private synchronized void removeRegistrations(final NotificationListenerRegistration... registrations) { + final Multimap, NotificationListenerRegistration> newListeners = + mutableListeners(); + + for (NotificationListenerRegistration reg : registrations) { + newListeners.remove(reg.getType(), reg); + } + + listeners.set(new ListenerMapGeneration(newListeners)); } private void announceNotificationSubscription(final Class notification) { @@ -141,49 +97,63 @@ public class NotificationBrokerImpl implements NotificationProviderService, Auto } @Override - public Registration registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) { - final NotificationInvoker invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener); - Set> _supportedNotifications = invoker.getSupportedNotifications(); - for (final Class notifyType : _supportedNotifications) { - { - NotificationListener _invocationProxy = invoker.getInvocationProxy(); - this.listeners.put(notifyType, _invocationProxy); - this.announceNotificationSubscription(notifyType); - } + public ListenerRegistration registerInterestListener(final NotificationInterestListener interestListener) { + final ListenerRegistration registration = this.interestListeners.register(interestListener); + + for (final Class notification : listeners.get().getKnownTypes()) { + interestListener.onNotificationSubscribtion(notification); } - GeneratedListenerRegistration _generatedListenerRegistration = new GeneratedListenerRegistration(listener, invoker, this); - final GeneratedListenerRegistration registration = _generatedListenerRegistration; - return (registration); + return registration; } - protected boolean unregisterListener(final GenericNotificationRegistration reg) { - Class _type = reg.getType(); - NotificationListener _instance = reg.getInstance(); - boolean _remove = this.listeners.remove(_type, _instance); - return _remove; + @Override + public NotificationListenerRegistration registerNotificationListener(final Class notificationType, final NotificationListener listener) { + final NotificationListenerRegistration reg = new AbstractNotificationListenerRegistration(notificationType, listener) { + @Override + protected void removeRegistration() { + removeRegistrations(this); + } + }; + + addRegistrations(reg); + return reg; } - protected void unregisterListener(final GeneratedListenerRegistration reg) { - NotificationInvoker _invoker = reg.getInvoker(); - Set> _supportedNotifications = _invoker.getSupportedNotifications(); - for (final Class notifyType : _supportedNotifications) { - NotificationInvoker _invoker_1 = reg.getInvoker(); - NotificationListener _invocationProxy = _invoker_1.getInvocationProxy(); - this.listeners.remove(notifyType, _invocationProxy); + @Override + public ListenerRegistration registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) { + final NotificationInvoker invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener); + final Set> types = invoker.getSupportedNotifications(); + final NotificationListenerRegistration[] regs = new NotificationListenerRegistration[types.size()]; + + // Populate the registrations... + int i = 0; + for (Class type : types) { + regs[i] = new AggregatedNotificationListenerRegistration(type, invoker.getInvocationProxy(), regs) { + @Override + protected void removeRegistration() { + // Nothing to do, will be cleaned up by parent (below) + } + }; + ++i; } + + // ... now put them to use ... + addRegistrations(regs); + + // ... finally return the parent registration + return new AbstractListenerRegistration(listener) { + @Override + protected void removeRegistration() { + removeRegistrations(regs); + for (ListenerRegistration reg : regs) { + reg.close(); + } + } + }; } @Override public void close() { } - @Override - public ListenerRegistration registerInterestListener(final NotificationInterestListener interestListener) { - final ListenerRegistration registration = this.interestListeners.register(interestListener); - Set> _keySet = this.listeners.keySet(); - for (final Class notification : _keySet) { - interestListener.onNotificationSubscribtion(notification); - } - return registration; - } }