Bug 3135 - Fixed support for InterestListener
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / md / sal / binding / compat / HeliumNotificationProviderServiceWithInterestListeners.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.binding.compat;
9
10 import com.google.common.collect.Sets;
11 import java.util.Collections;
12 import java.util.Iterator;
13 import java.util.Set;
14 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
15 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
16 import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
17 import org.opendaylight.controller.md.sal.dom.spi.DOMNotificationSubscriptionListener;
18 import org.opendaylight.controller.md.sal.dom.spi.DOMNotificationSubscriptionListenerRegistry;
19 import org.opendaylight.yangtools.concepts.ListenerRegistration;
20 import org.opendaylight.yangtools.util.ListenerRegistry;
21 import org.opendaylight.yangtools.yang.binding.Notification;
22 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 public class HeliumNotificationProviderServiceWithInterestListeners extends HeliumNotificationProviderServiceAdapter {
27
28     private static final Logger LOG = LoggerFactory.getLogger(HeliumNotificationProviderServiceWithInterestListeners.class);
29
30     private final ListenerRegistry<NotificationInterestListener> interestListeners = ListenerRegistry.create();
31     private final ListenerRegistration<Listener> domListener;
32     private final BindingToNormalizedNodeCodec codec;
33
34     public HeliumNotificationProviderServiceWithInterestListeners(
35             final NotificationPublishService publishService, final NotificationService listenService, final BindingToNormalizedNodeCodec codec, final DOMNotificationSubscriptionListenerRegistry registry) {
36         super(publishService, listenService);
37         this.codec = codec;
38         this.domListener = registry.registerSubscriptionListener(new Listener());
39     }
40
41     @Override
42     public ListenerRegistration<NotificationInterestListener> registerInterestListener(
43             final NotificationInterestListener listener) {
44         notifyListener(listener, translate(domListener.getInstance().getAllObserved()));
45         return interestListeners.register(listener);
46     }
47
48     private Set<Class<? extends Notification>> translate(final Set<SchemaPath> added) {
49         return codec.getNotificationClasses(added);
50     }
51
52     private void notifyAllListeners(final Set<SchemaPath> added) {
53         final Iterator<ListenerRegistration<NotificationInterestListener>> listeners = interestListeners.iterator();
54         if(listeners.hasNext()) {
55             final Set<Class<? extends Notification>> baEvent = translate(added);
56             while(listeners.hasNext()) {
57                 final NotificationInterestListener listenerRef = listeners.next().getInstance();
58                 try {
59                     notifyListener(listenerRef,baEvent);
60                 } catch (final Exception e) {
61                     LOG.warn("Unhandled exception during invoking listener {}",e, listenerRef);
62                 }
63             }
64         }
65     }
66
67     private void notifyListener(final NotificationInterestListener listener, final Set<Class<? extends Notification>> baEvent) {
68         for(final Class<? extends Notification> event: baEvent) {
69             listener.onNotificationSubscribtion(event);
70         }
71     }
72
73     private final class Listener implements DOMNotificationSubscriptionListener {
74
75         private volatile Set<SchemaPath> allObserved = Collections.emptySet();
76
77         @Override
78         public void onSubscriptionChanged(final Set<SchemaPath> currentTypes) {
79             final Set<SchemaPath> added = Sets.difference(currentTypes, allObserved).immutableCopy();
80             notifyAllListeners(added);
81             allObserved = Sets.union(allObserved, added).immutableCopy();
82         }
83
84         Set<SchemaPath> getAllObserved() {
85             return allObserved;
86         }
87     }
88
89     @Override
90     public void close() throws Exception {
91         super.close();
92         domListener.close();
93     }
94 }