BUG 3121 - destroy topic implementation
[controller.git] / opendaylight / md-sal / messagebus-impl / src / main / java / org / opendaylight / controller / messagebus / eventsources / netconf / NetconfEventSource.java
index a6400647514ebec952d2f344102fe42d7f44812c..e4ad387f4d084d1b44fe26d5bff6e2dfe2611dbe 100644 (file)
@@ -47,6 +47,7 @@ import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.even
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicStatus;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
@@ -111,9 +112,12 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener
         notificationTopicRegistrationList.add(new ConnectionNotificationTopicRegistration(ConnectionNotificationSourceName, this));
         Optional<Map<String, Stream>> streamMap = getAvailableStreams();
         if(streamMap.isPresent()){
+            LOG.debug("Stream configuration compare...");
             for (String urnPrefix : this.urnPrefixToStreamMap.keySet()) {
                 final String streamName = this.urnPrefixToStreamMap.get(urnPrefix);
+                LOG.debug("urnPrefix: {} streamName: {}", urnPrefix, streamName);
                 if(streamMap.get().containsKey(streamName)){
+                    LOG.debug("Stream containig on device");
                     notificationTopicRegistrationList.add(new StreamNotificationTopicRegistration(streamMap.get().get(streamName),urnPrefix, this));
                 }
             }
@@ -127,7 +131,7 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener
         Optional<DataBroker> dataBroker = this.mountPoint.getService(DataBroker.class);
 
         if(dataBroker.isPresent()){
-
+            LOG.debug("GET Available streams ...");
             ReadOnlyTransaction tx = dataBroker.get().newReadOnlyTransaction();
             CheckedFuture<Optional<Streams>, ReadFailedException> checkFeature = tx.read(LogicalDatastoreType.OPERATIONAL,pathStream);
 
@@ -136,6 +140,7 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener
                 if(streams.isPresent()){
                     streamMap = new HashMap<>();
                     for(Stream stream : streams.get().getStream()){
+                        LOG.debug("*** find stream {}", stream.getName().getValue());
                         streamMap.put(stream.getName().getValue(), stream);
                     }
                 }
@@ -143,6 +148,8 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener
                 LOG.warn("Can not read streams for node {}",this.nodeId);
             }
 
+        } else {
+            LOG.warn("No databroker on node {}", this.nodeId);
         }
 
         return Optional.fromNullable(streamMap);
@@ -150,34 +157,49 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener
 
     @Override
     public Future<RpcResult<JoinTopicOutput>> joinTopic(final JoinTopicInput input) {
-
+        LOG.debug("Join topic {} on {}", input.getTopicId().getValue(), this.nodeId);
         final NotificationPattern notificationPattern = input.getNotificationPattern();
         final List<SchemaPath> matchingNotifications = getMatchingNotifications(notificationPattern);
         return registerTopic(input.getTopicId(),matchingNotifications);
 
     }
 
-    private synchronized Future<RpcResult<JoinTopicOutput>> registerTopic(final TopicId topicId, final List<SchemaPath> notificationsToSubscribe){
+    @Override
+    public Future<RpcResult<Void>> disJoinTopic(DisJoinTopicInput input) {
+         for(NotificationTopicRegistration reg : notificationTopicRegistrationList){
+             reg.unRegisterNotificationTopic(input.getTopicId());
+         }
+        return Util.resultRpcSuccessFor((Void) null) ;
+    }
 
+    private synchronized Future<RpcResult<JoinTopicOutput>> registerTopic(final TopicId topicId, final List<SchemaPath> notificationsToSubscribe){
+        LOG.debug("Join topic {} - register");
         JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down;
         if(notificationsToSubscribe != null && notificationsToSubscribe.isEmpty() == false){
+            LOG.debug("Notifications to subscribe has found - count {}",notificationsToSubscribe.size() );
             final Optional<DOMNotificationService> notifyService = getDOMMountPoint().getService(DOMNotificationService.class);
             if(notifyService.isPresent()){
-                int subscribedStreams = 0;
+                int registeredNotificationCount = 0;
                 for(SchemaPath schemaNotification : notificationsToSubscribe){
                    for(NotificationTopicRegistration reg : notificationTopicRegistrationList){
-                      LOG.info("Source of notification {} is activating, TopicId {}", reg.getSourceName(), topicId.getValue() );
-                      reg.activateNotificationSource();
-                      boolean regSuccess = reg.registerNotificationTopic(schemaNotification, topicId);
-                      if(regSuccess){
-                         subscribedStreams = subscribedStreams +1;
-                      }
+                       LOG.debug("Try notification registratio {} on SchemaPathNotification {}", reg.getSourceName(), schemaNotification.getLastComponent().getLocalName());
+                       if(reg.checkNotificationPath(schemaNotification)){
+                           LOG.info("Source of notification {} is activating, TopicId {}", reg.getSourceName(), topicId.getValue() );
+                           boolean regSuccess = reg.registerNotificationTopic(schemaNotification, topicId);
+                           if(regSuccess){
+                              registeredNotificationCount = registeredNotificationCount +1;
+                           }
+                       }
                    }
                 }
-                if(subscribedStreams > 0){
+                if(registeredNotificationCount > 0){
                     joinTopicStatus = JoinTopicStatus.Up;
                 }
+            } else {
+                LOG.warn("NO DOMNotification service on node {}", this.nodeId);
             }
+        } else {
+            LOG.debug("Notifications to subscribe has NOT found");
         }
 
         final JoinTopicOutput output = new JoinTopicOutputBuilder().setStatus(joinTopicStatus).build();
@@ -195,13 +217,12 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener
     public void deActivateStreams(){
         for (NotificationTopicRegistration reg : notificationTopicRegistrationList) {
            LOG.info("Source of notification {} is deactivating on node {}", reg.getSourceName(), this.nodeId);
-            reg.deActivateNotificationSource();
+           reg.deActivateNotificationSource();
         }
     }
 
     @Override
     public void onNotification(final DOMNotification notification) {
-        LOG.info("Notification {} has been arrived...",notification.getType());
         SchemaPath notificationPath = notification.getType();
         Date notificationEventTime = null;
         if(notification instanceof DOMEvent){
@@ -218,7 +239,7 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener
 
                 for(TopicId topicId : topicIdsForNotification){
                     publishNotification(notification, topicId);
-                    LOG.info("Notification {} has been published for TopicId {}",notification.getType(), topicId.getValue());
+                    LOG.debug("Notification {} has been published for TopicId {}",notification.getType(), topicId.getValue());
                 }
 
             }