X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-binding-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fbinding%2Fimpl%2FNotificationBrokerImpl.xtend;h=6e493057b240e2f7bcce4c2ebad267afbf9fb4c6;hb=10a902e0f9c5c3e527e983374a195259909afb4c;hp=377d12ecac10340a2e5f64dc4773ae3ed0205e97;hpb=a251833f27fd00040904e2df316cd707c8129d1e;p=controller.git diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.xtend b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.xtend index 377d12ecac..6e493057b2 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.xtend +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.xtend @@ -15,22 +15,40 @@ import com.google.common.collect.HashMultimap import java.util.concurrent.ExecutorService import java.util.Collection import org.opendaylight.yangtools.concepts.Registration +import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeGenerator +import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory +import org.opendaylight.yangtools.concepts.ListenerRegistration +import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker +import org.opendaylight.yangtools.concepts.AbstractObjectRegistration +import java.util.Collections +import org.slf4j.LoggerFactory +import java.util.concurrent.Callable class NotificationBrokerImpl implements NotificationProviderService { val Multimap, NotificationListener> listeners; - val ExecutorService executor; + + @Property + var ExecutorService executor; + + @Property + var RuntimeCodeGenerator generator; + + @Property + var NotificationInvokerFactory invokerFactory; new(ExecutorService executor) { listeners = HashMultimap.create() this.executor = executor; } + @Deprecated override addNotificationListener(Class notificationType, NotificationListener listener) { listeners.put(notificationType, listener) } + @Deprecated override removeNotificationListener(Class notificationType, NotificationListener listener) { listeners.remove(notificationType, listener) @@ -45,68 +63,125 @@ class NotificationBrokerImpl implements NotificationProviderService { } @SuppressWarnings("unchecked") - def notifyAll(Collection> listeners, Notification notification) { + private def notifyAll(Collection> listeners, Notification notification) { listeners.forEach[(it as NotificationListener).onNotification(notification)] } + @Deprecated override addNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) { - throw new UnsupportedOperationException("TODO: auto-generated method stub") + throw new UnsupportedOperationException("Deprecated method. Use registerNotificationListener instead."); } + @Deprecated override removeNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) { - throw new UnsupportedOperationException("TODO: auto-generated method stub") + throw new UnsupportedOperationException( + "Deprecated method. Use RegisterNotificationListener returned value to close registration.") } + @Deprecated override notify(Notification notification, ExecutorService service) { - publish(notification) + publish(notification, service) } override publish(Notification notification) { - notification.notificationTypes.forEach [ - listeners.get(it as Class)?.notifyAll(notification) - ] + publish(notification, executor) } override publish(Notification notification, ExecutorService service) { - publish(notification) + val allTypes = notification.notificationTypes + + var Iterable> listenerToNotify = Collections.emptySet(); + for (type : allTypes) { + listenerToNotify = listenerToNotify + listeners.get(type as Class) + } + val tasks = listenerToNotify.map[new NotifyTask(it, notification)].toSet; + executor.invokeAll(tasks); } override registerNotificationListener(Class notificationType, NotificationListener listener) { - val reg = new GenericNotificationRegistration(notificationType,listener,this); - listeners.put(notificationType,listener); + val reg = new GenericNotificationRegistration(notificationType, listener, this); + listeners.put(notificationType, listener); return reg; } override registerNotificationListener( org.opendaylight.yangtools.yang.binding.NotificationListener listener) { - + val invoker = invokerFactory.invokerFor(listener); + for (notifyType : invoker.supportedNotifications) { + listeners.put(notifyType, invoker.invocationProxy) + } + val registration = new GeneratedListenerRegistration(listener, invoker,this); + return registration as Registration; } - - + protected def unregisterListener(GenericNotificationRegistration reg) { - listeners.remove(reg.type,reg.instance); + listeners.remove(reg.type, reg.instance); + } + + protected def unregisterListener(GeneratedListenerRegistration reg) { + for (notifyType : reg.invoker.supportedNotifications) { + listeners.remove(notifyType, reg.invoker.invocationProxy) + } } } -class GenericNotificationRegistration implements Registration> { - - @Property - var NotificationListener instance; - + +class GenericNotificationRegistration extends AbstractObjectRegistration> implements ListenerRegistration> { + @Property val Class type; - - - val NotificationBrokerImpl notificationBroker; - - public new(Class type, NotificationListener instance,NotificationBrokerImpl broker) { - _instance = instance; + + var NotificationBrokerImpl notificationBroker; + + public new(Class type, NotificationListener instance, NotificationBrokerImpl broker) { + super(instance); _type = type; notificationBroker = broker; } + + override protected removeRegistration() { + notificationBroker.unregisterListener(this); + notificationBroker = null; + } +} + +class GeneratedListenerRegistration extends AbstractObjectRegistration implements ListenerRegistration { + + @Property + val NotificationInvoker invoker; - override close() { + var NotificationBrokerImpl notificationBroker; + + + new(org.opendaylight.yangtools.yang.binding.NotificationListener instance, NotificationInvoker invoker, NotificationBrokerImpl broker) { + super(instance); + _invoker = invoker; + notificationBroker = broker; + } + + override protected removeRegistration() { notificationBroker.unregisterListener(this); + notificationBroker = null; + invoker.close(); + } +} + +@Data +class NotifyTask implements Callable { + + private static val log = LoggerFactory.getLogger(NotifyTask); + + val NotificationListener listener; + val Notification notification; + + override call() { + try { + listener.onNotification(notification); + } catch (Exception e) { + log.error("Unhandled exception {} thrown by listener: {} Notification: {}", e, listener, notification); + } + return null; } + }