BUG 2799: SPI for EventSources
[controller.git] / opendaylight / md-sal / messagebus-impl / src / main / java / org / opendaylight / controller / messagebus / eventsources / netconf / NetconfEventSourceManager.java
@@ -6,15 +6,15 @@
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
 
-package org.opendaylight.controller.messagebus.app.impl;
+package org.opendaylight.controller.messagebus.eventsources.netconf;
 
-import com.google.common.base.Optional;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
@@ -26,6 +26,8 @@ import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
@@ -59,30 +61,52 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
             .build();
     private static final QName NODE_ID_QNAME = QName.create(Node.QNAME,"node-id");
 
-
-    private final EventSourceTopology eventSourceTopology;
     private final Map<String, String> streamMap;
-
-    private final ConcurrentHashMap<InstanceIdentifier<?>, NetconfEventSource> netconfSources = new ConcurrentHashMap<>();
-    private final ListenerRegistration<DataChangeListener> listenerReg;
+    private final ConcurrentHashMap<InstanceIdentifier<?>, EventSourceRegistration<NetconfEventSource>> eventSourceRegistration = new ConcurrentHashMap<>();
     private final DOMNotificationPublishService publishService;
     private final DOMMountPointService domMounts;
     private final MountPointService bindingMounts;
+    private ListenerRegistration<DataChangeListener> listenerRegistration;
+    private final EventSourceRegistry eventSourceRegistry;
+
+    public static NetconfEventSourceManager create(final DataBroker dataBroker,
+            final DOMNotificationPublishService domPublish,
+            final DOMMountPointService domMount,
+            final MountPointService bindingMount,
+            final EventSourceRegistry eventSourceRegistry,
+            final List<NamespaceToStream> namespaceMapping){
+
+        final NetconfEventSourceManager eventSourceManager =
+                new NetconfEventSourceManager(domPublish, domMount, bindingMount, eventSourceRegistry, namespaceMapping);
 
-    public NetconfEventSourceManager(final DataBroker dataStore,
-                              final DOMNotificationPublishService domPublish,
+        eventSourceManager.initialize(dataBroker);
+
+        return eventSourceManager;
+
+    }
+
+    private NetconfEventSourceManager(final DOMNotificationPublishService domPublish,
                               final DOMMountPointService domMount,
                               final MountPointService bindingMount,
-                              final EventSourceTopology eventSourceTopology,
+                              final EventSourceRegistry eventSourceRegistry,
                               final List<NamespaceToStream> namespaceMapping) {
 
-        listenerReg = dataStore.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, NETCONF_DEVICE_PATH, this, DataChangeScope.SUBTREE);
-        this.eventSourceTopology = eventSourceTopology;
+        Preconditions.checkNotNull(domPublish);
+        Preconditions.checkNotNull(domMount);
+        Preconditions.checkNotNull(bindingMount);
+        Preconditions.checkNotNull(eventSourceRegistry);
+        Preconditions.checkNotNull(namespaceMapping);
         this.streamMap = namespaceToStreamMapping(namespaceMapping);
         this.domMounts = domMount;
         this.bindingMounts = bindingMount;
         this.publishService = domPublish;
-        LOGGER.info("EventSourceManager initialized.");
+        this.eventSourceRegistry = eventSourceRegistry;
+    }
+
+    private void initialize(final DataBroker dataBroker){
+        Preconditions.checkNotNull(dataBroker);
+        listenerRegistration = dataBroker.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, NETCONF_DEVICE_PATH, this, DataChangeScope.SUBTREE);
+        LOGGER.info("NetconfEventSourceManager initialized.");
     }
 
     private Map<String,String> namespaceToStreamMapping(final List<NamespaceToStream> namespaceMapping) {
@@ -97,7 +121,7 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
 
     @Override
     public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
-        //FIXME: Prevent creating new event source on subsequent changes in inventory, like disconnect.
+
         LOGGER.debug("[DataChangeEvent<InstanceIdentifier<?>, DataObject>: {}]", event);
         for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getCreatedData().entrySet()) {
             if (changeEntry.getValue() instanceof Node) {
@@ -105,22 +129,19 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
             }
         }
 
-
         for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getUpdatedData().entrySet()) {
             if (changeEntry.getValue() instanceof Node) {
                 nodeUpdated(changeEntry.getKey(),(Node) changeEntry.getValue());
             }
         }
 
-
     }
 
     private void nodeUpdated(final InstanceIdentifier<?> key, final Node node) {
 
         // we listen on node tree, therefore we should rather throw IllegalStateException when node is null
         if ( node == null ) {
-            LOGGER.debug("OnDataChanged Event. Node is null.");
-            return;
+            throw new IllegalStateException("Node is null");
         }
         if ( isNetconfNode(node) == false ) {
             LOGGER.debug("OnDataChanged Event. Not a Netconf node.");
@@ -134,7 +155,7 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
             return;
         }
 
-        if(!netconfSources.containsKey(key)) {
+        if(!eventSourceRegistration.containsKey(key)) {
             createEventSource(key,node);
         }
     }
@@ -144,10 +165,12 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
         final Optional<MountPoint> bindingMount = bindingMounts.getMountPoint(key);
 
         if(netconfMount.isPresent() && bindingMount.isPresent()) {
-            final String nodeId = node.getNodeId().getValue();
-            final NetconfEventSource netconfEventSource = new NetconfEventSource(nodeId, streamMap, netconfMount.get(), publishService, bindingMount.get());
-            eventSourceTopology.register(node,netconfEventSource);
-            netconfSources.putIfAbsent(key, netconfEventSource);
+
+            final NetconfEventSource netconfEventSource =
+                    new NetconfEventSource(node, streamMap, netconfMount.get(), publishService, bindingMount.get());
+            final EventSourceRegistration<NetconfEventSource> registration = eventSourceRegistry.registerEventSource(netconfEventSource);
+            eventSourceRegistration.putIfAbsent(key, registration);
+
         }
     }
 
@@ -159,13 +182,21 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
         return node.getAugmentation(NetconfNode.class) != null ;
     }
 
-    public boolean isEventSource(final Node node) {
-        final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
+    private boolean isEventSource(final Node node) {
 
+        final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
         return isEventSource(netconfNode);
+
     }
 
     private boolean isEventSource(final NetconfNode node) {
+        if (node.getAvailableCapabilities() == null) {
+            return false;
+        }
+        final List<String> capabilities = node.getAvailableCapabilities().getAvailableCapability();
+        if(capabilities == null) {
+             return false;
+        }
         for (final String capability : node.getAvailableCapabilities().getAvailableCapability()) {
             if(capability.startsWith("(urn:ietf:params:xml:ns:netconf:notification")) {
                 return true;
@@ -177,6 +208,10 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
 
     @Override
     public void close() {
-        listenerReg.close();
+        for(final EventSourceRegistration<NetconfEventSource> reg : eventSourceRegistration.values()){
+            reg.close();
+        }
+        listenerRegistration.close();
     }
+
 }
\ No newline at end of file