2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
\r
4 * This program and the accompanying materials are made available under the
\r
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
\r
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
\r
8 package org.opendaylight.controller.sal.core.impl.notify;
\r
10 import java.util.Collections;
\r
11 import java.util.HashMap;
\r
12 import java.util.HashSet;
\r
13 import java.util.List;
\r
14 import java.util.Map;
\r
15 import java.util.Map.Entry;
\r
16 import java.util.Set;
\r
18 import org.opendaylight.controller.sal.core.api.BrokerService;
\r
19 import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession;
\r
20 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
\r
21 import org.opendaylight.controller.sal.core.api.Consumer.ConsumerFunctionality;
\r
22 import org.opendaylight.controller.sal.core.api.Provider.ProviderFunctionality;
\r
23 import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
\r
24 import org.opendaylight.controller.sal.core.api.notify.NotificationProviderService;
\r
25 import org.opendaylight.controller.sal.core.api.notify.NotificationService;
\r
26 import org.opendaylight.controller.sal.core.impl.BrokerServiceImpl;
\r
27 import org.opendaylight.controller.sal.core.impl.Utils;
\r
28 import org.opendaylight.controller.sal.core.spi.BrokerModule;
\r
29 import org.opendaylight.controller.yang.common.QName;
\r
30 import org.opendaylight.controller.yang.data.api.CompositeNode;
\r
31 import org.slf4j.Logger;
\r
32 import org.slf4j.LoggerFactory;
\r
36 public class NotificationModule implements BrokerModule {
\r
37 private static Logger log = LoggerFactory
\r
38 .getLogger(NotificationModule.class);
\r
40 private Map<QName, List<NotificationListener>> listeners = new HashMap<QName, List<NotificationListener>>();
\r
43 public Set<Class<? extends BrokerService>> getProvidedServices() {
\r
45 Set<Class<? extends BrokerService>> ret = new HashSet<Class<? extends BrokerService>>();
\r
46 ret.add(NotificationService.class);
\r
47 ret.add(NotificationProviderService.class);
\r
52 public Set<Class<? extends ConsumerFunctionality>> getSupportedConsumerFunctionality() {
\r
54 Set<Class<? extends ConsumerFunctionality>> ret = new HashSet<Class<? extends ConsumerFunctionality>>();
\r
55 ret.add(NotificationListener.class);
\r
60 public <T extends BrokerService> T getServiceForSession(Class<T> service,
\r
61 ConsumerSession session) {
\r
62 if (NotificationProviderService.class.equals(service)
\r
63 && session instanceof ProviderSession) {
\r
64 @SuppressWarnings("unchecked")
\r
65 T ret = (T) newNotificationProviderService(session);
\r
67 } else if (NotificationService.class.equals(service)) {
\r
69 @SuppressWarnings("unchecked")
\r
70 T ret = (T) newNotificationConsumerService(session);
\r
74 throw new IllegalArgumentException(
\r
75 "The requested session-specific service is not provided by this module.");
\r
78 private void sendNotification(CompositeNode notification) {
\r
79 QName type = notification.getNodeType();
\r
80 List<NotificationListener> toNotify = listeners.get(type);
\r
81 log.info("Publishing notification " + type);
\r
83 if (toNotify == null) {
\r
84 // No listeners were registered - returns.
\r
88 for (NotificationListener listener : toNotify) {
\r
90 // FIXME: ensure that notification is immutable
\r
91 listener.onNotification(notification);
\r
92 } catch (Exception e) {
\r
93 log.error("Uncaught exception in NotificationListener", e);
\r
99 private NotificationService newNotificationConsumerService(
\r
100 ConsumerSession session) {
\r
101 return new NotificationConsumerSessionImpl();
\r
104 private NotificationProviderService newNotificationProviderService(
\r
105 ConsumerSession session) {
\r
106 return new NotificationProviderSessionImpl();
\r
109 private class NotificationConsumerSessionImpl extends BrokerServiceImpl
\r
110 implements NotificationService {
\r
112 Map<QName, List<NotificationListener>> consumerListeners = new HashMap<QName, List<NotificationListener>>();
\r
113 private boolean closed = false;
\r
116 public void addNotificationListener(QName notification,
\r
117 NotificationListener listener) {
\r
118 checkSessionState();
\r
119 if (notification == null) {
\r
120 throw new IllegalArgumentException(
\r
121 "Notification type must not be null.");
\r
123 if (listener == null) {
\r
124 throw new IllegalArgumentException("Listener must not be null.");
\r
127 Utils.addToMap(consumerListeners, notification, listener);
\r
128 Utils.addToMap(listeners, notification, listener);
\r
129 log.info("Registered listener for notification: " + notification);
\r
133 public void removeNotificationListener(QName notification,
\r
134 NotificationListener listener) {
\r
135 checkSessionState();
\r
136 if (notification == null) {
\r
137 throw new IllegalArgumentException(
\r
138 "Notification type must not be null.");
\r
140 if (listener == null) {
\r
141 throw new IllegalArgumentException("Listener must not be null.");
\r
143 Utils.removeFromMap(consumerListeners, notification, listener);
\r
144 Utils.removeFromMap(listeners, notification, listener);
\r
148 public void closeSession() {
\r
150 Set<Entry<QName, List<NotificationListener>>> toRemove = consumerListeners
\r
152 for (Entry<QName, List<NotificationListener>> entry : toRemove) {
\r
153 listeners.get(entry.getKey()).removeAll(entry.getValue());
\r
157 protected void checkSessionState() {
\r
159 throw new IllegalStateException("Session is closed");
\r
163 private class NotificationProviderSessionImpl extends
\r
164 NotificationConsumerSessionImpl implements
\r
165 NotificationProviderService {
\r
168 public void sendNotification(CompositeNode notification) {
\r
169 checkSessionState();
\r
170 if (notification == null)
\r
171 throw new IllegalArgumentException(
\r
172 "Notification must not be null.");
\r
173 NotificationModule.this.sendNotification(notification);
\r
178 public Set<Class<? extends ProviderFunctionality>> getSupportedProviderFunctionality() {
\r
179 return Collections.emptySet();
\r