Added support for binding-independent RPCs
[controller.git] / opendaylight / sal / yang-prototype / sal / sal-broker-impl / src / main / java / org / opendaylight / controller / sal / core / impl / NotificationModule.java
diff --git a/opendaylight/sal/yang-prototype/sal/sal-broker-impl/src/main/java/org/opendaylight/controller/sal/core/impl/NotificationModule.java b/opendaylight/sal/yang-prototype/sal/sal-broker-impl/src/main/java/org/opendaylight/controller/sal/core/impl/NotificationModule.java
new file mode 100644 (file)
index 0000000..e507796
--- /dev/null
@@ -0,0 +1,187 @@
+/*\r
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.\r
+ *\r
+ * This program and the accompanying materials are made available under the\r
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
+ * and is available at http://www.eclipse.org/legal/epl-v10.html\r
+ */\r
+package org.opendaylight.controller.sal.core.impl.notify;\r
+\r
+import java.util.Collection;\r
+import java.util.Collections;\r
+import java.util.HashSet;\r
+import java.util.Map;\r
+import java.util.Map.Entry;\r
+import java.util.Set;\r
+\r
+import org.opendaylight.controller.sal.core.api.BrokerService;\r
+import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession;\r
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;\r
+import org.opendaylight.controller.sal.core.api.Consumer.ConsumerFunctionality;\r
+import org.opendaylight.controller.sal.core.api.Provider.ProviderFunctionality;\r
+import org.opendaylight.controller.sal.core.api.notify.NotificationListener;\r
+import org.opendaylight.controller.sal.core.api.notify.NotificationProviderService;\r
+import org.opendaylight.controller.sal.core.api.notify.NotificationService;\r
+import org.opendaylight.controller.sal.core.spi.BrokerModule;\r
+import org.opendaylight.controller.yang.common.QName;\r
+import org.opendaylight.controller.yang.data.api.CompositeNode;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import com.google.common.collect.HashMultimap;\r
+import com.google.common.collect.ImmutableSet;\r
+import com.google.common.collect.Multimap;\r
+\r
+public class NotificationModule implements BrokerModule {\r
+    private static Logger log = LoggerFactory\r
+            .getLogger(NotificationModule.class);\r
+\r
+    private Multimap<QName, NotificationListener> listeners = HashMultimap\r
+            .create();\r
+\r
+    private static final Set<Class<? extends BrokerService>> PROVIDED_SERVICE_TYPE = ImmutableSet\r
+            .of((Class<? extends BrokerService>) NotificationService.class,\r
+                    NotificationProviderService.class);\r
+\r
+    private static final Set<Class<? extends ConsumerFunctionality>> SUPPORTED_CONSUMER_FUNCTIONALITY = ImmutableSet\r
+            .of((Class<? extends ConsumerFunctionality>) NotificationListener.class,\r
+                    NotificationListener.class); // Workaround: if we use the\r
+                                                 // version of method with only\r
+                                                 // one argument, the generics\r
+                                                 // inference will not work\r
+\r
+    @Override\r
+    public Set<Class<? extends BrokerService>> getProvidedServices() {\r
+        return PROVIDED_SERVICE_TYPE;\r
+    }\r
+\r
+    @Override\r
+    public Set<Class<? extends ConsumerFunctionality>> getSupportedConsumerFunctionality() {\r
+        return SUPPORTED_CONSUMER_FUNCTIONALITY;\r
+    }\r
+\r
+    @Override\r
+    public <T extends BrokerService> T getServiceForSession(Class<T> service,\r
+            ConsumerSession session) {\r
+        if (NotificationProviderService.class.equals(service)\r
+                && session instanceof ProviderSession) {\r
+            @SuppressWarnings("unchecked")\r
+            T ret = (T) newNotificationProviderService(session);\r
+            return ret;\r
+        } else if (NotificationService.class.equals(service)) {\r
+\r
+            @SuppressWarnings("unchecked")\r
+            T ret = (T) newNotificationConsumerService(session);\r
+            return ret;\r
+        }\r
+\r
+        throw new IllegalArgumentException(\r
+                "The requested session-specific service is not provided by this module.");\r
+    }\r
+\r
+    private void sendNotification(CompositeNode notification) {\r
+        QName type = notification.getNodeType();\r
+        Collection<NotificationListener> toNotify = listeners.get(type);\r
+        log.info("Publishing notification " + type);\r
+\r
+        if (toNotify == null) {\r
+            // No listeners were registered - returns.\r
+            return;\r
+        }\r
+\r
+        for (NotificationListener listener : toNotify) {\r
+            try {\r
+                // FIXME: ensure that notification is immutable\r
+                listener.onNotification(notification);\r
+            } catch (Exception e) {\r
+                log.error("Uncaught exception in NotificationListener", e);\r
+            }\r
+        }\r
+\r
+    }\r
+\r
+    private NotificationService newNotificationConsumerService(\r
+            ConsumerSession session) {\r
+        return new NotificationConsumerSessionImpl();\r
+    }\r
+\r
+    private NotificationProviderService newNotificationProviderService(\r
+            ConsumerSession session) {\r
+        return new NotificationProviderSessionImpl();\r
+    }\r
+\r
+    private class NotificationConsumerSessionImpl implements\r
+            NotificationService {\r
+\r
+        private Multimap<QName, NotificationListener> consumerListeners = HashMultimap\r
+                .create();\r
+        private boolean closed = false;\r
+\r
+        @Override\r
+        public void addNotificationListener(QName notification,\r
+                NotificationListener listener) {\r
+            checkSessionState();\r
+            if (notification == null) {\r
+                throw new IllegalArgumentException(\r
+                        "Notification type must not be null.");\r
+            }\r
+            if (listener == null) {\r
+                throw new IllegalArgumentException("Listener must not be null.");\r
+            }\r
+\r
+            consumerListeners.put(notification, listener);\r
+            listeners.put(notification, listener);\r
+            log.info("Registered listener for notification: " + notification);\r
+        }\r
+\r
+        @Override\r
+        public void removeNotificationListener(QName notification,\r
+                NotificationListener listener) {\r
+            checkSessionState();\r
+            if (notification == null) {\r
+                throw new IllegalArgumentException(\r
+                        "Notification type must not be null.");\r
+            }\r
+            if (listener == null) {\r
+                throw new IllegalArgumentException("Listener must not be null.");\r
+            }\r
+            consumerListeners.remove(notification, listener);\r
+            listeners.remove(notification, listener);\r
+        }\r
+\r
+        @Override\r
+        public void closeSession() {\r
+            closed = true;\r
+            Map<QName, Collection<NotificationListener>> toRemove = consumerListeners\r
+                    .asMap();\r
+            for (Entry<QName, Collection<NotificationListener>> entry : toRemove\r
+                    .entrySet()) {\r
+                listeners.remove(entry.getKey(), entry.getValue());\r
+            }\r
+        }\r
+\r
+        protected void checkSessionState() {\r
+            if (closed)\r
+                throw new IllegalStateException("Session is closed");\r
+        }\r
+    }\r
+\r
+    private class NotificationProviderSessionImpl extends\r
+            NotificationConsumerSessionImpl implements\r
+            NotificationProviderService {\r
+\r
+        @Override\r
+        public void sendNotification(CompositeNode notification) {\r
+            checkSessionState();\r
+            if (notification == null)\r
+                throw new IllegalArgumentException(\r
+                        "Notification must not be null.");\r
+            NotificationModule.this.sendNotification(notification);\r
+        }\r
+    }\r
+\r
+    @Override\r
+    public Set<Class<? extends ProviderFunctionality>> getSupportedProviderFunctionality() {\r
+        return Collections.emptySet();\r
+    }\r
+}\r