X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-binding-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fbinding%2Fimpl%2FNotificationBrokerImpl.xtend;h=fe2681f1f768ab6c1f607550d2c637cda1297d4e;hp=b10c06f0c53e359ec2cabe3028b7243339c6f53b;hb=7312d7684413675e8dda5e3616deaa727e6d18ba;hpb=5fe4a2c1eb78c0d0fe349354f39df4861ded6ee3 diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.xtend b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.xtend index b10c06f0c5..fe2681f1f7 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.xtend +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.xtend @@ -8,79 +8,51 @@ package org.opendaylight.controller.sal.binding.impl import com.google.common.collect.HashMultimap +import com.google.common.collect.ImmutableSet import com.google.common.collect.Multimap -import java.util.Collection +import com.google.common.collect.Multimaps import java.util.Collections import java.util.concurrent.Callable import java.util.concurrent.ExecutorService +import java.util.concurrent.Future +import java.util.Set import org.opendaylight.controller.sal.binding.api.NotificationListener import org.opendaylight.controller.sal.binding.api.NotificationProviderService +import org.opendaylight.controller.sal.binding.api.NotificationProviderService.NotificationInterestListener +import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker import org.opendaylight.yangtools.concepts.AbstractObjectRegistration import org.opendaylight.yangtools.concepts.ListenerRegistration import org.opendaylight.yangtools.concepts.Registration +import org.opendaylight.yangtools.concepts.util.ListenerRegistry import org.opendaylight.yangtools.yang.binding.Notification -import org.slf4j.LoggerFactory import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder - -class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable { +import org.slf4j.LoggerFactory +class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable { + + val ListenerRegistry interestListeners = ListenerRegistry.create; + val Multimap, NotificationListener> listeners; @Property var ExecutorService executor; + + val logger = LoggerFactory.getLogger(NotificationBrokerImpl) new() { - listeners = HashMultimap.create() + listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create()) } @Deprecated new(ExecutorService executor) { - listeners = HashMultimap.create() + listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create()) this.executor = executor; } - @Deprecated - override addNotificationListener(Class notificationType, - NotificationListener listener) { - listeners.put(notificationType, listener) - } - - @Deprecated - override removeNotificationListener(Class notificationType, - NotificationListener listener) { - listeners.remove(notificationType, listener) - } - - override notify(Notification notification) { - publish(notification) - } - def getNotificationTypes(Notification notification) { notification.class.interfaces.filter[it != Notification && Notification.isAssignableFrom(it)] } - @SuppressWarnings("unchecked") - private def notifyAll(Collection> listeners, Notification notification) { - listeners.forEach[(it as NotificationListener).onNotification(notification)] - } - - @Deprecated - override addNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) { - throw new UnsupportedOperationException("Deprecated method. Use registerNotificationListener instead."); - - } - - @Deprecated - override removeNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) { - throw new UnsupportedOperationException( - "Deprecated method. Use RegisterNotificationListener returned value to close registration.") - } - - @Deprecated - override notify(Notification notification, ExecutorService service) { - publish(notification, service) - } - override publish(Notification notification) { publish(notification, executor) } @@ -93,21 +65,41 @@ class NotificationBrokerImpl implements NotificationProviderService, AutoCloseab listenerToNotify = listenerToNotify + listeners.get(type as Class) } val tasks = listenerToNotify.map[new NotifyTask(it, notification)].toSet; - executor.invokeAll(tasks); + submitAll(executor,tasks); } - + + def submitAll(ExecutorService service, Set tasks) { + val ret = ImmutableSet.>builder(); + for(task : tasks) { + ret.add(service.submit(task)); + } + return ret.build(); + } + override registerNotificationListener(Class notificationType, NotificationListener listener) { val reg = new GenericNotificationRegistration(notificationType, listener, this); listeners.put(notificationType, listener); + announceNotificationSubscription(notificationType); return reg; } + + def announceNotificationSubscription(Class notification) { + for (listener : interestListeners) { + try { + listener.instance.onNotificationSubscribtion(notification); + } catch (Exception e) { + logger.error("", e.message) + } + } + } override registerNotificationListener( org.opendaylight.yangtools.yang.binding.NotificationListener listener) { val invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener); for (notifyType : invoker.supportedNotifications) { listeners.put(notifyType, invoker.invocationProxy) + announceNotificationSubscription(notifyType) } val registration = new GeneratedListenerRegistration(listener, invoker,this); return registration as Registration; @@ -127,6 +119,14 @@ class NotificationBrokerImpl implements NotificationProviderService, AutoCloseab //FIXME: implement properly. } + override registerInterestListener(NotificationInterestListener interestListener) { + val registration = interestListeners.register(interestListener); + + for(notification : listeners.keySet) { + interestListener.onNotificationSubscribtion(notification); + } + return registration + } } class GenericNotificationRegistration extends AbstractObjectRegistration> implements ListenerRegistration> { @@ -174,14 +174,24 @@ class NotifyTask implements Callable { private static val log = LoggerFactory.getLogger(NotifyTask); + @SuppressWarnings("rawtypes") val NotificationListener listener; val Notification notification; override call() { + //Only logging the complete notification in debug mode try { - log.info("Delivering notification {} to {}",notification,listener); + if(log.isDebugEnabled){ + log.debug("Delivering notification {} to {}",notification,listener); + } else { + log.trace("Delivering notification {} to {}",notification.class.name,listener); + } listener.onNotification(notification); - log.info("Notification delivered {} to {}",notification,listener); + if(log.isDebugEnabled){ + log.debug("Notification delivered {} to {}",notification,listener); + } else { + log.trace("Notification delivered {} to {}",notification.class.name,listener); + } } catch (Exception e) { log.error("Unhandled exception thrown by listener: {}", listener, e); }