Fixup Augmentable and Identifiable methods changing
[controller.git] / opendaylight / md-sal / messagebus-impl / src / main / java / org / opendaylight / controller / messagebus / app / impl / EventSourceTopology.java
index 603f34bac994d9ff44c8e37ed9f99bca11b17c47..934056dcab81f2eeb81f387a6d6986c31ec5b629 100644 (file)
@@ -8,31 +8,32 @@
 
 package org.opendaylight.controller.messagebus.app.impl;
 
-import com.google.common.base.Optional;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
-
-import java.util.Collections;
-import java.util.List;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.messagebus.app.util.Util;
+import org.opendaylight.controller.messagebus.spi.EventSource;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutput;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicOutput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1Builder;
@@ -49,7 +50,6 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
@@ -57,16 +57,14 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.regex.Pattern;
-
-public class EventSourceTopology implements EventAggregatorService, AutoCloseable {
+public class EventSourceTopology implements EventAggregatorService, EventSourceRegistry {
     private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class);
 
     private static final String TOPOLOGY_ID = "EVENT-SOURCE-TOPOLOGY" ;
     private static final TopologyKey EVENT_SOURCE_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TOPOLOGY_ID));
     private static final LogicalDatastoreType OPERATIONAL = LogicalDatastoreType.OPERATIONAL;
 
-    private static final InstanceIdentifier<Topology> EVENT_SOURCE_TOPOLOGY_PATH =
+    static final InstanceIdentifier<Topology> EVENT_SOURCE_TOPOLOGY_PATH =
             InstanceIdentifier.create(NetworkTopology.class)
                     .child(Topology.class, EVENT_SOURCE_TOPOLOGY_KEY);
 
@@ -75,134 +73,161 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
                     .child(TopologyTypes.class)
                     .augmentation(TopologyTypes1.class);
 
-    private final Map<DataChangeListener, ListenerRegistration<DataChangeListener>> registrations =
+    private final Map<TopicId,EventSourceTopic> eventSourceTopicMap = new ConcurrentHashMap<>();
+    private final Map<NodeKey, RoutedRpcRegistration<EventSourceService>> routedRpcRegistrations =
             new ConcurrentHashMap<>();
 
     private final DataBroker dataBroker;
     private final RpcRegistration<EventAggregatorService> aggregatorRpcReg;
     private final EventSourceService eventSourceService;
     private final RpcProviderRegistry rpcRegistry;
-    private final ExecutorService executorService;
 
     public EventSourceTopology(final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry) {
+
         this.dataBroker = dataBroker;
-        this.executorService = Executors.newCachedThreadPool();
         this.rpcRegistry = rpcRegistry;
         aggregatorRpcReg = rpcRegistry.addRpcImplementation(EventAggregatorService.class, this);
         eventSourceService = rpcRegistry.getRpcService(EventSourceService.class);
 
         final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
-        final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
+        final TopologyTypes1 topologyTypeAugment =
+                new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
         putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment);
+        LOG.info("EventSourceRegistry has been initialized");
     }
 
     private <T extends DataObject>  void putData(final LogicalDatastoreType store,
-            final InstanceIdentifier<T> path, final T data) {
+                                                 final InstanceIdentifier<T> path,
+                                                 final T data) {
 
-        final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
+        final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
         tx.put(store, path, data, true);
-        tx.submit();
+        Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                LOG.trace("Data has put into datastore {} {}", store, path);
+            }
+
+            @Override
+            public void onFailure(final Throwable ex) {
+                LOG.error("Can not put data into datastore [store: {}] [path: {}] [exception: {}]",store,path, ex);
+            }
+        }, MoreExecutors.directExecutor());
     }
 
-    private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath, final Node node) {
-        final NodeKey nodeKey = node.getKey();
+    private <T extends DataObject>  void deleteData(final LogicalDatastoreType store,
+            final InstanceIdentifier<T> path) {
+        final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
+        tx.delete(OPERATIONAL, path);
+        Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                LOG.trace("Data has deleted from datastore {} {}", store, path);
+            }
+
+            @Override
+            public void onFailure(final Throwable ex) {
+                LOG.error("Can not delete data from datastore [store: {}] [path: {}] [exception: {}]",store,path, ex);
+            }
+        }, MoreExecutors.directExecutor());
+    }
+
+    private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath) {
+        final NodeKey nodeKey = sourcePath.getKey();
         final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
-        final Node1 nodeAgument = new Node1Builder().setEventSourceNode(new NodeId(nodeKey.getNodeId().getValue())).build();
+        final Node1 nodeAgument = new Node1Builder().setEventSourceNode(
+                new NodeId(nodeKey.getNodeId().getValue())).build();
         putData(OPERATIONAL, augmentPath, nodeAgument);
     }
 
-    private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){
-        executorService.execute(new NotifyAllNodeExecutor(dataBroker, nodeIdPatternRegex, eventSourceTopic));
+    private void remove(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath) {
+        final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
+        deleteData(OPERATIONAL, augmentPath);
     }
 
     @Override
-    public Future<RpcResult<CreateTopicOutput>> createTopic(final CreateTopicInput input) {
-        LOG.info("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
+    public ListenableFuture<RpcResult<CreateTopicOutput>> createTopic(final CreateTopicInput input) {
+        LOG.debug("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
                 input.getNotificationPattern(),
                 input.getNodeIdPattern());
 
         final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern());
+        //FIXME: do not use Util.wildcardToRegex - NodeIdPatter should be regex
         final String nodeIdPattern = input.getNodeIdPattern().getValue();
-        final Pattern nodeIdPatternRegex = Pattern.compile(Util.wildcardToRegex(nodeIdPattern));
-        final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, input.getNodeIdPattern().getValue(), eventSourceService);
-
-        registerTopic(eventSourceTopic);
+        final EventSourceTopic eventSourceTopic = EventSourceTopic.create(notificationPattern, nodeIdPattern, this);
 
-        notifyExistingNodes(nodeIdPatternRegex, eventSourceTopic);
+        eventSourceTopicMap.put(eventSourceTopic.getTopicId(), eventSourceTopic);
 
         final CreateTopicOutput cto = new CreateTopicOutputBuilder()
                 .setTopicId(eventSourceTopic.getTopicId())
                 .build();
 
-        return Util.resultFor(cto);
+        LOG.info("Topic has been created: NotificationPattern -> {}, NodeIdPattern -> {}",
+                input.getNotificationPattern(),
+                input.getNodeIdPattern());
+
+        return Util.resultRpcSuccessFor(cto);
     }
 
     @Override
-    public Future<RpcResult<Void>> destroyTopic(final DestroyTopicInput input) {
-        return Futures.immediateFailedFuture(new UnsupportedOperationException("Not Implemented"));
+    public ListenableFuture<RpcResult<DestroyTopicOutput>> destroyTopic(final DestroyTopicInput input) {
+        final EventSourceTopic topicToDestroy = eventSourceTopicMap.remove(input.getTopicId());
+        if (topicToDestroy != null) {
+            topicToDestroy.close();
+        }
+        return Util.resultRpcSuccessFor(new DestroyTopicOutputBuilder().build());
     }
 
     @Override
     public void close() {
         aggregatorRpcReg.close();
+        eventSourceTopicMap.values().forEach(EventSourceTopic::close);
     }
 
-    public void registerTopic(final EventSourceTopic listener) {
-        final ListenerRegistration<DataChangeListener> listenerRegistration = dataBroker.registerDataChangeListener(OPERATIONAL,
-                EVENT_SOURCE_TOPOLOGY_PATH,
-                listener,
-                DataBroker.DataChangeScope.SUBTREE);
+    public void register(final EventSource eventSource) {
+
+        final NodeKey nodeKey = eventSource.getSourceNodeKey();
+        final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
+        final RoutedRpcRegistration<EventSourceService> reg = rpcRegistry.addRoutedRpcImplementation(
+                EventSourceService.class, eventSource);
+        reg.registerPath(NodeContext.class, sourcePath);
+        routedRpcRegistrations.put(nodeKey,reg);
+        insert(sourcePath);
 
-        registrations.put(listener, listenerRegistration);
     }
 
-    public void register(final Node node, final NetconfEventSource netconfEventSource) {
-        final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey());
-        rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, netconfEventSource)
-            .registerPath(NodeContext.class, sourcePath);
-        insert(sourcePath,node);
-        // FIXME: Return registration object.
+    public void unRegister(final EventSource eventSource) {
+        final NodeKey nodeKey = eventSource.getSourceNodeKey();
+        final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
+        final RoutedRpcRegistration<EventSourceService> removeRegistration = routedRpcRegistrations.remove(nodeKey);
+        if (removeRegistration != null) {
+            removeRegistration.close();
+            remove(sourcePath);
+        }
     }
 
-    private class NotifyAllNodeExecutor implements Runnable {
+    @Override
+    public <T extends EventSource> EventSourceRegistration<T> registerEventSource(final T eventSource) {
+        final EventSourceRegistrationImpl<T> esr = new EventSourceRegistrationImpl<>(eventSource, this);
+        register(eventSource);
+        return esr;
+    }
 
-        private final EventSourceTopic topic;
-        private final DataBroker dataBroker;
-        private final Pattern nodeIdPatternRegex;
+    DataBroker getDataBroker() {
+        return dataBroker;
+    }
 
-        public NotifyAllNodeExecutor(final DataBroker dataBroker, final Pattern nodeIdPatternRegex, final EventSourceTopic topic) {
-            this.topic = topic;
-            this.dataBroker = dataBroker;
-            this.nodeIdPatternRegex = nodeIdPatternRegex;
-        }
+    EventSourceService getEventSourceService() {
+        return eventSourceService;
+    }
 
-        @Override
-        public void run() {
-            //# Code reader note: Context of Node type is NetworkTopology
-            final List<Node> nodes = snapshot();
-            for (final Node node : nodes) {
-                if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) {
-                    topic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
-                }
-            }
-        }
+    @VisibleForTesting
+    Map<NodeKey, RoutedRpcRegistration<EventSourceService>> getRoutedRpcRegistrations() {
+        return routedRpcRegistrations;
+    }
 
-        private List<Node> snapshot() {
-            try (ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();) {
-
-                final Optional<Topology> data = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH).checkedGet();
-
-                if(data.isPresent()) {
-                    final List<Node> nodeList = data.get().getNode();
-                    if(nodeList != null) {
-                        return nodeList;
-                    }
-                }
-                return Collections.emptyList();
-            } catch (final ReadFailedException e) {
-                LOG.error("Unable to retrieve node list.", e);
-                return Collections.emptyList();
-            }
-        }
+    @VisibleForTesting
+    Map<TopicId, EventSourceTopic> getEventSourceTopicMap() {
+        return eventSourceTopicMap;
     }
 }