--- /dev/null
+/*\r
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.\r
+ *\r
+ * This program and the accompanying materials are made available under the\r
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
+ * and is available at http://www.eclipse.org/legal/epl-v10.html\r
+ */\r
+package org.opendaylight.controller.sal.core.impl.notify;\r
+\r
+import java.util.Collection;\r
+import java.util.Collections;\r
+import java.util.HashSet;\r
+import java.util.Map;\r
+import java.util.Map.Entry;\r
+import java.util.Set;\r
+\r
+import org.opendaylight.controller.sal.core.api.BrokerService;\r
+import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession;\r
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;\r
+import org.opendaylight.controller.sal.core.api.Consumer.ConsumerFunctionality;\r
+import org.opendaylight.controller.sal.core.api.Provider.ProviderFunctionality;\r
+import org.opendaylight.controller.sal.core.api.notify.NotificationListener;\r
+import org.opendaylight.controller.sal.core.api.notify.NotificationProviderService;\r
+import org.opendaylight.controller.sal.core.api.notify.NotificationService;\r
+import org.opendaylight.controller.sal.core.spi.BrokerModule;\r
+import org.opendaylight.controller.yang.common.QName;\r
+import org.opendaylight.controller.yang.data.api.CompositeNode;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import com.google.common.collect.HashMultimap;\r
+import com.google.common.collect.ImmutableSet;\r
+import com.google.common.collect.Multimap;\r
+\r
+public class NotificationModule implements BrokerModule {\r
+ private static Logger log = LoggerFactory\r
+ .getLogger(NotificationModule.class);\r
+\r
+ private Multimap<QName, NotificationListener> listeners = HashMultimap\r
+ .create();\r
+\r
+ private static final Set<Class<? extends BrokerService>> PROVIDED_SERVICE_TYPE = ImmutableSet\r
+ .of((Class<? extends BrokerService>) NotificationService.class,\r
+ NotificationProviderService.class);\r
+\r
+ private static final Set<Class<? extends ConsumerFunctionality>> SUPPORTED_CONSUMER_FUNCTIONALITY = ImmutableSet\r
+ .of((Class<? extends ConsumerFunctionality>) NotificationListener.class,\r
+ NotificationListener.class); // Workaround: if we use the\r
+ // version of method with only\r
+ // one argument, the generics\r
+ // inference will not work\r
+\r
+ @Override\r
+ public Set<Class<? extends BrokerService>> getProvidedServices() {\r
+ return PROVIDED_SERVICE_TYPE;\r
+ }\r
+\r
+ @Override\r
+ public Set<Class<? extends ConsumerFunctionality>> getSupportedConsumerFunctionality() {\r
+ return SUPPORTED_CONSUMER_FUNCTIONALITY;\r
+ }\r
+\r
+ @Override\r
+ public <T extends BrokerService> T getServiceForSession(Class<T> service,\r
+ ConsumerSession session) {\r
+ if (NotificationProviderService.class.equals(service)\r
+ && session instanceof ProviderSession) {\r
+ @SuppressWarnings("unchecked")\r
+ T ret = (T) newNotificationProviderService(session);\r
+ return ret;\r
+ } else if (NotificationService.class.equals(service)) {\r
+\r
+ @SuppressWarnings("unchecked")\r
+ T ret = (T) newNotificationConsumerService(session);\r
+ return ret;\r
+ }\r
+\r
+ throw new IllegalArgumentException(\r
+ "The requested session-specific service is not provided by this module.");\r
+ }\r
+\r
+ private void sendNotification(CompositeNode notification) {\r
+ QName type = notification.getNodeType();\r
+ Collection<NotificationListener> toNotify = listeners.get(type);\r
+ log.info("Publishing notification " + type);\r
+\r
+ if (toNotify == null) {\r
+ // No listeners were registered - returns.\r
+ return;\r
+ }\r
+\r
+ for (NotificationListener listener : toNotify) {\r
+ try {\r
+ // FIXME: ensure that notification is immutable\r
+ listener.onNotification(notification);\r
+ } catch (Exception e) {\r
+ log.error("Uncaught exception in NotificationListener", e);\r
+ }\r
+ }\r
+\r
+ }\r
+\r
+ private NotificationService newNotificationConsumerService(\r
+ ConsumerSession session) {\r
+ return new NotificationConsumerSessionImpl();\r
+ }\r
+\r
+ private NotificationProviderService newNotificationProviderService(\r
+ ConsumerSession session) {\r
+ return new NotificationProviderSessionImpl();\r
+ }\r
+\r
+ private class NotificationConsumerSessionImpl implements\r
+ NotificationService {\r
+\r
+ private Multimap<QName, NotificationListener> consumerListeners = HashMultimap\r
+ .create();\r
+ private boolean closed = false;\r
+\r
+ @Override\r
+ public void addNotificationListener(QName notification,\r
+ NotificationListener listener) {\r
+ checkSessionState();\r
+ if (notification == null) {\r
+ throw new IllegalArgumentException(\r
+ "Notification type must not be null.");\r
+ }\r
+ if (listener == null) {\r
+ throw new IllegalArgumentException("Listener must not be null.");\r
+ }\r
+\r
+ consumerListeners.put(notification, listener);\r
+ listeners.put(notification, listener);\r
+ log.info("Registered listener for notification: " + notification);\r
+ }\r
+\r
+ @Override\r
+ public void removeNotificationListener(QName notification,\r
+ NotificationListener listener) {\r
+ checkSessionState();\r
+ if (notification == null) {\r
+ throw new IllegalArgumentException(\r
+ "Notification type must not be null.");\r
+ }\r
+ if (listener == null) {\r
+ throw new IllegalArgumentException("Listener must not be null.");\r
+ }\r
+ consumerListeners.remove(notification, listener);\r
+ listeners.remove(notification, listener);\r
+ }\r
+\r
+ @Override\r
+ public void closeSession() {\r
+ closed = true;\r
+ Map<QName, Collection<NotificationListener>> toRemove = consumerListeners\r
+ .asMap();\r
+ for (Entry<QName, Collection<NotificationListener>> entry : toRemove\r
+ .entrySet()) {\r
+ listeners.remove(entry.getKey(), entry.getValue());\r
+ }\r
+ }\r
+\r
+ protected void checkSessionState() {\r
+ if (closed)\r
+ throw new IllegalStateException("Session is closed");\r
+ }\r
+ }\r
+\r
+ private class NotificationProviderSessionImpl extends\r
+ NotificationConsumerSessionImpl implements\r
+ NotificationProviderService {\r
+\r
+ @Override\r
+ public void sendNotification(CompositeNode notification) {\r
+ checkSessionState();\r
+ if (notification == null)\r
+ throw new IllegalArgumentException(\r
+ "Notification must not be null.");\r
+ NotificationModule.this.sendNotification(notification);\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public Set<Class<? extends ProviderFunctionality>> getSupportedProviderFunctionality() {\r
+ return Collections.emptySet();\r
+ }\r
+}\r