Merge "Bug fixes for netconf southbound plugin."
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / NotificationBrokerImpl.xtend
index a3c8c5f232b0b6e7daae0350880eca7e790985a5..9a431fec74f4a3cf7c3f4d694f6c2000df3de718 100644 (file)
@@ -21,22 +21,31 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration
 import org.opendaylight.yangtools.concepts.Registration\r
 import org.opendaylight.yangtools.yang.binding.Notification\r
 import org.slf4j.LoggerFactory\r
-import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder\r
+import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder\rimport com.google.common.collect.Multimaps\r
+import org.opendaylight.yangtools.concepts.util.ListenerRegistry\r
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService.NotificationInterestListener\rimport java.util.Set\r
+import java.util.Set\r
+import com.google.common.collect.ImmutableSet\r
+import java.util.concurrent.Future\r
 \r
 class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {\r
-\r
+    \r
+    val ListenerRegistry<NotificationInterestListener> interestListeners = ListenerRegistry.create;\r
+    \r
     val Multimap<Class<? extends Notification>, NotificationListener<?>> listeners;\r
 \r
     @Property\r
     var ExecutorService executor;\r
+    \r
+    val logger = LoggerFactory.getLogger(NotificationBrokerImpl)\r
 \r
     new() {\r
-        listeners = HashMultimap.create()\r
+        listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create())\r
     }\r
 \r
     @Deprecated\r
     new(ExecutorService executor) {\r
-        listeners = HashMultimap.create()\r
+        listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create())\r
         this.executor = executor;\r
     }\r
 \r
@@ -94,21 +103,41 @@ class NotificationBrokerImpl implements NotificationProviderService, AutoCloseab
             listenerToNotify = listenerToNotify + listeners.get(type as Class<? extends Notification>)\r
         }\r
         val tasks = listenerToNotify.map[new NotifyTask(it, notification)].toSet;\r
-        executor.invokeAll(tasks);\r
+        submitAll(executor,tasks);\r
     }\r
-\r
+    \r
+    def submitAll(ExecutorService service, Set<NotifyTask> tasks) {\r
+        val ret = ImmutableSet.<Future<Object>>builder();\r
+        for(task : tasks) {\r
+            ret.add(service.submit(task));\r
+        }\r
+        return ret.build();\r
+    }\r
+    \r
     override <T extends Notification> registerNotificationListener(Class<T> notificationType,\r
         NotificationListener<T> listener) {\r
         val reg = new GenericNotificationRegistration<T>(notificationType, listener, this);\r
         listeners.put(notificationType, listener);\r
+        announceNotificationSubscription(notificationType);\r
         return reg;\r
     }\r
+    \r
+    def announceNotificationSubscription(Class<? extends Notification> notification) {\r
+        for (listener : interestListeners) {\r
+            try {\r
+                listener.instance.onNotificationSubscribtion(notification);\r
+            } catch (Exception e) {\r
+                logger.error("", e.message)\r
+            }\r
+        }\r
+    }\r
 \r
     override registerNotificationListener(\r
         org.opendaylight.yangtools.yang.binding.NotificationListener listener) {\r
         val invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener);\r
         for (notifyType : invoker.supportedNotifications) {\r
             listeners.put(notifyType, invoker.invocationProxy)\r
+            announceNotificationSubscription(notifyType)\r
         }\r
         val registration = new GeneratedListenerRegistration(listener, invoker,this);\r
         return registration as Registration<org.opendaylight.yangtools.yang.binding.NotificationListener>;\r
@@ -128,6 +157,14 @@ class NotificationBrokerImpl implements NotificationProviderService, AutoCloseab
         //FIXME: implement properly.\r
     }\r
     \r
+    override registerInterestListener(NotificationInterestListener interestListener) {\r
+        val registration = interestListeners.register(interestListener);\r
+        \r
+        for(notification : listeners.keySet) {\r
+            interestListener.onNotificationSubscribtion(notification);\r
+        }\r
+        return registration\r
+    }\r
 }\r
 \r
 class GenericNotificationRegistration<T extends Notification> extends AbstractObjectRegistration<NotificationListener<T>> implements ListenerRegistration<NotificationListener<T>> {\r