BUG 3121 - destroy topic implementation
[controller.git] / opendaylight / md-sal / messagebus-impl / src / main / java / org / opendaylight / controller / messagebus / eventsources / netconf / StreamNotificationTopicRegistration.java
index e0d4fe21e178b5cd4fd9d6026fecf56a32dcc940..2e654d0b8b3cb49db462c22f72fc63d1e5fc3e9e 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.messagebus.eventsources.netconf;
 
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
@@ -58,6 +59,7 @@ public class StreamNotificationTopicRegistration extends NotificationTopicRegist
         this.lastEventTime= null;
         setReplaySupported(this.stream.isReplaySupport());
         setActive(false);
+        LOG.info("StreamNotificationTopicRegistration initialized for {}", getStreamName());
     }
 
     void activateNotificationSource() {
@@ -129,16 +131,26 @@ public class StreamNotificationTopicRegistration extends NotificationTopicRegist
 
     @Override
     boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId){
-        if(validateNotificationPath(notificationPath) == false){
+
+        if(checkNotificationPath(notificationPath) == false){
             LOG.debug("Bad SchemaPath for notification try to register");
             return false;
         }
+
         final Optional<DOMNotificationService> notifyService = domMountPoint.getService(DOMNotificationService.class);
         if(notifyService.isPresent() == false){
             LOG.debug("DOMNotificationService is not present");
             return false;
         }
-        ListenerRegistration<NetconfEventSource> registration = notifyService.get().registerNotificationListener(this.netconfEventSource,notificationPath);
+
+        activateNotificationSource();
+        if(isActive() == false){
+            LOG.warn("Stream {} is not active, listener for notification {} is not registered.", getStreamName(), notificationPath.toString());
+            return false;
+        }
+
+        ListenerRegistration<NetconfEventSource> registration =
+                notifyService.get().registerNotificationListener(this.netconfEventSource,notificationPath);
         notificationRegistrationMap.put(notificationPath, registration);
         ArrayList<TopicId> topicIds = getNotificationTopicIds(notificationPath);
         if(topicIds == null){
@@ -149,16 +161,30 @@ public class StreamNotificationTopicRegistration extends NotificationTopicRegist
                 topicIds.add(topicId);
             }
         }
+
         notificationTopicMap.put(notificationPath, topicIds);
         return true;
     }
 
-    private boolean validateNotificationPath(SchemaPath notificationPath){
-        if(notificationPath == null){
-            return false;
+    @Override
+    synchronized void unRegisterNotificationTopic(TopicId topicId) {
+        List<SchemaPath> notificationPathToRemove = new ArrayList<>();
+        for(SchemaPath notifKey : notificationTopicMap.keySet()){
+            ArrayList<TopicId> topicList = notificationTopicMap.get(notifKey);
+            if(topicList != null){
+                topicList.remove(topicId);
+                if(topicList.isEmpty()){
+                    notificationPathToRemove.add(notifKey);
+                }
+            }
+        }
+        for(SchemaPath notifKey : notificationPathToRemove){
+            notificationTopicMap.remove(notifKey);
+            ListenerRegistration<NetconfEventSource> reg = notificationRegistrationMap.remove(notifKey);
+            if(reg != null){
+                reg.close();
+            }
         }
-        String nameSpace = notificationPath.getLastComponent().toString();
-        return nameSpace.startsWith(getNotificationUrnPrefix());
     }
 
     Optional<Date> getLastEventTime() {
@@ -175,9 +201,4 @@ public class StreamNotificationTopicRegistration extends NotificationTopicRegist
         closeStream();
     }
 
-    @Override
-    void unRegisterNotificationTopic(TopicId topicId) {
-        // TODO: use it when destroy topic will be implemented
-    }
-
 }