BUG 3121 - destroy topic implementation
[controller.git] / opendaylight / md-sal / messagebus-impl / src / main / java / org / opendaylight / controller / messagebus / app / impl / EventSourceTopology.java
index 279528907c049d3f5cc172dac57e53658331b95a..3aa470b10ad9940729043fd918a9727ffbf5ab84 100644 (file)
@@ -8,18 +8,14 @@
 
 package org.opendaylight.controller.messagebus.app.impl;
 
-import java.util.List;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
-import java.util.regex.Pattern;
-
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.messagebus.spi.EventSource;
 import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
 import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
@@ -32,6 +28,7 @@ import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.even
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1Builder;
@@ -48,7 +45,6 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
@@ -56,11 +52,6 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-
 
 public class EventSourceTopology implements EventAggregatorService, EventSourceRegistry {
     private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class);
@@ -69,7 +60,7 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR
     private static final TopologyKey EVENT_SOURCE_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TOPOLOGY_ID));
     private static final LogicalDatastoreType OPERATIONAL = LogicalDatastoreType.OPERATIONAL;
 
-    private static final InstanceIdentifier<Topology> EVENT_SOURCE_TOPOLOGY_PATH =
+    static final InstanceIdentifier<Topology> EVENT_SOURCE_TOPOLOGY_PATH =
             InstanceIdentifier.create(NetworkTopology.class)
                     .child(Topology.class, EVENT_SOURCE_TOPOLOGY_KEY);
 
@@ -78,8 +69,7 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR
                     .child(TopologyTypes.class)
                     .augmentation(TopologyTypes1.class);
 
-    private final Map<EventSourceTopic, ListenerRegistration<DataChangeListener>> topicListenerRegistrations =
-            new ConcurrentHashMap<>();
+    private final Map<TopicId,EventSourceTopic> eventSourceTopicMap = new ConcurrentHashMap<>();
     private final Map<NodeKey, RoutedRpcRegistration<EventSourceService>> routedRpcRegistrations =
             new ConcurrentHashMap<>();
 
@@ -105,16 +95,39 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR
                                                  final InstanceIdentifier<T> path,
                                                  final T data){
 
-        final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
+        final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
         tx.put(store, path, data, true);
-        tx.submit();
+        Futures.addCallback( tx.submit(), new FutureCallback<Void>(){
+
+            @Override
+            public void onSuccess(final Void result) {
+                LOG.trace("Data has put into datastore {} {}", store, path);
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                LOG.error("Can not put data into datastore [store: {}] [path: {}] [exception: {}]",store,path, t);
+            }
+        });
 
     }
 
     private <T extends DataObject>  void deleteData(final LogicalDatastoreType store, final InstanceIdentifier<T> path){
-        final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
+        final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
         tx.delete(OPERATIONAL, path);
-        tx.submit();
+        Futures.addCallback( tx.submit(), new FutureCallback<Void>(){
+
+            @Override
+            public void onSuccess(final Void result) {
+                LOG.trace("Data has deleted from datastore {} {}", store, path);
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                LOG.error("Can not delete data from datastore [store: {}] [path: {}] [exception: {}]",store,path, t);
+            }
+
+        });
     }
 
     private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath) {
@@ -129,100 +142,56 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR
         deleteData(OPERATIONAL, augmentPath);
     }
 
-    private void notifyExistingNodes(final EventSourceTopic eventSourceTopic){
-
-        final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
-
-        final CheckedFuture<Optional<Topology>, ReadFailedException> future = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH);
-
-        Futures.addCallback(future, new FutureCallback<Optional<Topology>>(){
-
-            @Override
-            public void onSuccess(Optional<Topology> data) {
-                if(data.isPresent()) {
-                    LOG.info("Topology data are present...");
-                     final List<Node> nodes = data.get().getNode();
-                     if(nodes != null){
-                         LOG.info("List of nodes is not null...");
-                         final Pattern nodeIdPatternRegex = eventSourceTopic.getNodeIdRegexPattern();
-                     for (final Node node : nodes) {
-                         if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) {
-                             eventSourceTopic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
-                         }
-                     }
-                     } else {
-                         LOG.info("List of nodes is NULL...");
-                     }
-                }
-                tx.close();
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                LOG.error("Can not notify existing nodes {}", t);
-                tx.close();
-            }
-
-        });
-
-    }
-
     @Override
     public Future<RpcResult<CreateTopicOutput>> createTopic(final CreateTopicInput input) {
-        LOG.info("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
+        LOG.debug("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
                 input.getNotificationPattern(),
                 input.getNodeIdPattern());
 
         final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern());
+        //FIXME: do not use Util.wildcardToRegex - NodeIdPatter should be regex
         final String nodeIdPattern = input.getNodeIdPattern().getValue();
-        final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, nodeIdPattern, eventSourceService);
+        final EventSourceTopic eventSourceTopic = EventSourceTopic.create(notificationPattern, nodeIdPattern, this);
 
-        registerTopic(eventSourceTopic);
-
-        notifyExistingNodes(eventSourceTopic);
+        eventSourceTopicMap.put(eventSourceTopic.getTopicId(), eventSourceTopic);
 
         final CreateTopicOutput cto = new CreateTopicOutputBuilder()
                 .setTopicId(eventSourceTopic.getTopicId())
                 .build();
 
+        LOG.info("Topic has been created: NotificationPattern -> {}, NodeIdPattern -> {}",
+                input.getNotificationPattern(),
+                input.getNodeIdPattern());
+
         return Util.resultRpcSuccessFor(cto);
     }
 
     @Override
     public Future<RpcResult<Void>> destroyTopic(final DestroyTopicInput input) {
-        return Futures.immediateFailedFuture(new UnsupportedOperationException("Not Implemented"));
+        final EventSourceTopic topicToDestroy = eventSourceTopicMap.remove(input.getTopicId());
+        if(topicToDestroy != null){
+            topicToDestroy.close();
+        }
+        return Util.resultRpcSuccessFor((Void) null);
     }
 
     @Override
     public void close() {
         aggregatorRpcReg.close();
-        for(ListenerRegistration<DataChangeListener> reg : topicListenerRegistrations.values()){
-            reg.close();
+        for(final EventSourceTopic est : eventSourceTopicMap.values()){
+            est.close();
         }
     }
 
-    private void registerTopic(final EventSourceTopic listener) {
-        final ListenerRegistration<DataChangeListener> listenerRegistration = dataBroker.registerDataChangeListener(OPERATIONAL,
-                EVENT_SOURCE_TOPOLOGY_PATH,
-                listener,
-                DataBroker.DataChangeScope.SUBTREE);
-
-        topicListenerRegistrations.put(listener, listenerRegistration);
-    }
-
     public void register(final EventSource eventSource){
-        NodeKey nodeKey = eventSource.getSourceNodeKey();
+
+        final NodeKey nodeKey = eventSource.getSourceNodeKey();
         final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
-        RoutedRpcRegistration<EventSourceService> reg = rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, eventSource);
+        final RoutedRpcRegistration<EventSourceService> reg = rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, eventSource);
         reg.registerPath(NodeContext.class, sourcePath);
         routedRpcRegistrations.put(nodeKey,reg);
         insert(sourcePath);
 
-        for(EventSourceTopic est : topicListenerRegistrations.keySet()){
-            if(est.getNodeIdRegexPattern().matcher(nodeKey.getNodeId().getValue()).matches()){
-                est.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey));
-            }
-        }
     }
 
     public void unRegister(final EventSource eventSource){
@@ -236,11 +205,18 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR
     }
 
     @Override
-    public <T extends EventSource> EventSourceRegistration<T> registerEventSource(
-            T eventSource) {
-        EventSourceRegistrationImpl<T> esr = new EventSourceRegistrationImpl<>(eventSource, this);
+    public <T extends EventSource> EventSourceRegistration<T> registerEventSource(final T eventSource) {
+        final EventSourceRegistrationImpl<T> esr = new EventSourceRegistrationImpl<>(eventSource, this);
         register(eventSource);
         return esr;
     }
+
+    DataBroker getDataBroker() {
+        return dataBroker;
+    }
+
+    EventSourceService getEventSourceService() {
+        return eventSourceService;
+    }
 }