Merge "BUG 3057 - notify added event source by topics created before"
[controller.git] / opendaylight / md-sal / messagebus-impl / src / main / java / org / opendaylight / controller / messagebus / app / impl / EventSourceTopology.java
index 076d1b2fc7e9c1d42fee6e0ece1d7c9adc4a3228..6140a78ba586a7beaa89a92ae0ab9cc5fb109016 100644 (file)
@@ -20,6 +20,10 @@ import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.messagebus.spi.EventSource;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
@@ -57,7 +61,8 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 
-public class EventSourceTopology implements EventAggregatorService, AutoCloseable {
+
+public class EventSourceTopology implements EventAggregatorService, EventSourceRegistry {
     private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class);
 
     private static final String TOPOLOGY_ID = "EVENT-SOURCE-TOPOLOGY" ;
@@ -73,7 +78,9 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
                     .child(TopologyTypes.class)
                     .augmentation(TopologyTypes1.class);
 
-    private final Map<DataChangeListener, ListenerRegistration<DataChangeListener>> registrations =
+    private final Map<EventSourceTopic, ListenerRegistration<DataChangeListener>> topicListenerRegistrations =
+            new ConcurrentHashMap<>();
+    private final Map<NodeKey, RoutedRpcRegistration<EventSourceService>> routedRpcRegistrations =
             new ConcurrentHashMap<>();
 
     private final DataBroker dataBroker;
@@ -91,7 +98,7 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
         final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
         final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
         putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment);
-
+        LOG.info("EventSourceRegistry has been initialized");
     }
 
     private <T extends DataObject>  void putData(final LogicalDatastoreType store,
@@ -104,13 +111,24 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
 
     }
 
-    private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath, final Node node) {
-        final NodeKey nodeKey = node.getKey();
+    private <T extends DataObject>  void deleteData(final LogicalDatastoreType store, final InstanceIdentifier<T> path){
+        final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
+        tx.delete(OPERATIONAL, path);
+        tx.submit();
+    }
+
+    private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath) {
+        final NodeKey nodeKey = sourcePath.getKey();
         final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
         final Node1 nodeAgument = new Node1Builder().setEventSourceNode(new NodeId(nodeKey.getNodeId().getValue())).build();
         putData(OPERATIONAL, augmentPath, nodeAgument);
     }
 
+    private void remove(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath){
+        final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
+        deleteData(OPERATIONAL, augmentPath);
+    }
+
     private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){
 
         final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
@@ -150,8 +168,8 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
 
         final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern());
         final String nodeIdPattern = input.getNodeIdPattern().getValue();
-        final Pattern nodeIdPatternRegex = Pattern.compile(Util.wildcardToRegex(nodeIdPattern));
-        final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, input.getNodeIdPattern().getValue(), eventSourceService);
+        final Pattern nodeIdPatternRegex = Pattern.compile(nodeIdPattern);
+        final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, nodeIdPattern, eventSourceService);
 
         registerTopic(eventSourceTopic);
 
@@ -161,7 +179,7 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
                 .setTopicId(eventSourceTopic.getTopicId())
                 .build();
 
-        return Util.resultFor(cto);
+        return Util.resultRpcSuccessFor(cto);
     }
 
     @Override
@@ -172,23 +190,49 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
     @Override
     public void close() {
         aggregatorRpcReg.close();
+        for(ListenerRegistration<DataChangeListener> reg : topicListenerRegistrations.values()){
+            reg.close();
+        }
     }
 
-    public void registerTopic(final EventSourceTopic listener) {
+    private void registerTopic(final EventSourceTopic listener) {
         final ListenerRegistration<DataChangeListener> listenerRegistration = dataBroker.registerDataChangeListener(OPERATIONAL,
                 EVENT_SOURCE_TOPOLOGY_PATH,
                 listener,
                 DataBroker.DataChangeScope.SUBTREE);
 
-        registrations.put(listener, listenerRegistration);
+        topicListenerRegistrations.put(listener, listenerRegistration);
+    }
+
+    public void register(final EventSource eventSource){
+        NodeKey nodeKey = eventSource.getSourceNodeKey();
+        final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
+        RoutedRpcRegistration<EventSourceService> reg = rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, eventSource);
+        reg.registerPath(NodeContext.class, sourcePath);
+        routedRpcRegistrations.put(nodeKey,reg);
+        insert(sourcePath);
+
+        for(EventSourceTopic est : topicListenerRegistrations.keySet()){
+            est.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey));
+        }
     }
 
-    public void register(final Node node, final NetconfEventSource netconfEventSource) {
-        final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey());
-        rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, netconfEventSource)
-            .registerPath(NodeContext.class, sourcePath);
-        insert(sourcePath,node);
-        // FIXME: Return registration object.
+    public void unRegister(final EventSource eventSource){
+        final NodeKey nodeKey = eventSource.getSourceNodeKey();
+        final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
+        final RoutedRpcRegistration<EventSourceService> removeRegistration = routedRpcRegistrations.remove(nodeKey);
+        if(removeRegistration != null){
+            removeRegistration.close();
+        remove(sourcePath);
+        }
     }
 
+    @Override
+    public <T extends EventSource> EventSourceRegistration<T> registerEventSource(
+            T eventSource) {
+        EventSourceRegistrationImpl<T> esr = new EventSourceRegistrationImpl<>(eventSource, this);
+        register(eventSource);
+        return esr;
+    }
 }
+