BUG 3121 - destroy topic implementation 03/21803/1
authorMarian Adamjak <madamjak@cisco.com>
Tue, 12 May 2015 07:15:21 +0000 (09:15 +0200)
committerTony Tkacik <ttkacik@cisco.com>
Wed, 3 Jun 2015 16:42:50 +0000 (18:42 +0200)
   - implementing destroyTopic rpc in EventSourceTopology
   - add rpc dis-join-topic in event-sourec.yang and implement it
   - fixed bugs identified during test with real device
   - revision of code and cleaning up
   - rebased

Change-Id: I0f43ef2ca1f54db5a08c379792a59c3894a77767
Signed-off-by: Marian Adamjak <madamjak@cisco.com>
(cherry picked from commit d039ff1e73202fa815e72db49d63ded711386f07)

13 files changed:
opendaylight/md-sal/messagebus-api/src/main/yang/event-source.yang
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Util.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistration.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NotificationTopicRegistration.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopicTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopologyTest.java

index c56243b..c90b266 100644 (file)
@@ -24,7 +24,9 @@ module event-source {
         http://www.eclipse.org/legal/epl-v10.html";
 
     revision "2014-12-02" {
-        description "first revision";
+        description "first revision
+            + add rpc dis-join-topic
+            + add notification event-source-status-notification";
     }
 
     // FIXME: expand this
@@ -84,6 +86,21 @@ module event-source {
         }
     }
 
+    rpc dis-join-topic {
+        input {
+            leaf node {
+               ext:context-reference "inv:node-context";
+               type "instance-identifier";
+            }
+            leaf topic-id {
+                type aggr:topic-id;
+                mandatory true;
+                description "identifier of topic to be disjoin";
+            }
+        }
+
+    }
+
     notification event-source-status-notification {
 
         description
index 6de407f..d6132be 100644 (file)
@@ -8,40 +8,64 @@
 
 package org.opendaylight.controller.messagebus.app.impl;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
 import java.util.regex.Pattern;
-
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
-public class EventSourceTopic implements DataChangeListener {
+public class EventSourceTopic implements DataChangeListener, AutoCloseable {
     private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(EventSourceTopic.class);
     private final NotificationPattern notificationPattern;
     private final EventSourceService sourceService;
     private final Pattern nodeIdPattern;
     private final TopicId topicId;
+    private ListenerRegistration<DataChangeListener> listenerRegistration;
+    private final CopyOnWriteArraySet<InstanceIdentifier<?>> joinedEventSources = new CopyOnWriteArraySet<>();
 
-    public EventSourceTopic(final NotificationPattern notificationPattern, final String nodeIdPattern, final EventSourceService eventSource) {
-        this.notificationPattern = Preconditions.checkNotNull(notificationPattern);
-        this.sourceService = eventSource;
-        this.nodeIdPattern = Pattern.compile(nodeIdPattern);
+    public static EventSourceTopic create(final NotificationPattern notificationPattern, final String nodeIdRegexPattern, final EventSourceTopology eventSourceTopology){
+        final EventSourceTopic est = new EventSourceTopic(notificationPattern, nodeIdRegexPattern, eventSourceTopology.getEventSourceService());
+        est.registerListner(eventSourceTopology);
+        est.notifyExistingNodes(eventSourceTopology);
+        return est;
+    }
 
-        this.topicId = new TopicId(Util.getUUIDIdent());
+    private EventSourceTopic(final NotificationPattern notificationPattern, final String nodeIdRegexPattern, final EventSourceService sourceService) {
+        this.notificationPattern = Preconditions.checkNotNull(notificationPattern);
+        this.sourceService = Preconditions.checkNotNull(sourceService);
+        this.nodeIdPattern = Pattern.compile(nodeIdRegexPattern);
+        this.topicId = new TopicId(getUUIDIdent());
+        this.listenerRegistration = null;
+        LOG.info("EventSourceTopic created - topicId {}", topicId.getValue());
     }
 
     public TopicId getTopicId() {
@@ -50,10 +74,22 @@ public class EventSourceTopic implements DataChangeListener {
 
     @Override
     public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
-                for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getUpdatedData().entrySet()) {
+
+        for (final Map.Entry<InstanceIdentifier<?>, DataObject> createdEntry : event.getCreatedData().entrySet()) {
+            if (createdEntry.getValue() instanceof Node) {
+                final Node node = (Node) createdEntry.getValue();
+                LOG.debug("Create node...");
+                if (getNodeIdRegexPattern().matcher(node.getNodeId().getValue()).matches()) {
+                    LOG.debug("Matched...");
+                    notifyNode(EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
+                }
+            }
+        }
+
+        for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getUpdatedData().entrySet()) {
             if (changeEntry.getValue() instanceof Node) {
                 final Node node = (Node) changeEntry.getValue();
-                if (getNodeIdRegexPattern().matcher(node.getId().getValue()).matches()) {
+                if (getNodeIdRegexPattern().matcher(node.getNodeId().getValue()).matches()) {
                     notifyNode(changeEntry.getKey());
                 }
             }
@@ -61,19 +97,56 @@ public class EventSourceTopic implements DataChangeListener {
     }
 
     public void notifyNode(final InstanceIdentifier<?> nodeId) {
-
+        LOG.debug("Notify node: {}", nodeId);
         try {
-            RpcResult<JoinTopicOutput> rpcResultJoinTopic = sourceService.joinTopic(getJoinTopicInputArgument(nodeId)).get();
+            final RpcResult<JoinTopicOutput> rpcResultJoinTopic = sourceService.joinTopic(getJoinTopicInputArgument(nodeId)).get();
             if(rpcResultJoinTopic.isSuccessful() == false){
-                for(RpcError err : rpcResultJoinTopic.getErrors()){
+                for(final RpcError err : rpcResultJoinTopic.getErrors()){
                     LOG.error("Can not join topic: [{}] on node: [{}]. Error: {}",getTopicId().getValue(),nodeId.toString(),err.toString());
                 }
+            } else {
+                joinedEventSources.add(nodeId);
             }
         } catch (final Exception e) {
             LOG.error("Could not invoke join topic for node {}", nodeId);
         }
     }
 
+    private void notifyExistingNodes(final EventSourceTopology eventSourceTopology){
+        LOG.debug("Notify existing nodes");
+        final Pattern nodeRegex = this.nodeIdPattern;
+
+        final ReadOnlyTransaction tx = eventSourceTopology.getDataBroker().newReadOnlyTransaction();
+        final CheckedFuture<Optional<Topology>, ReadFailedException> future =
+                tx.read(LogicalDatastoreType.OPERATIONAL, EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH);
+
+        Futures.addCallback(future, new FutureCallback<Optional<Topology>>(){
+
+            @Override
+            public void onSuccess(final Optional<Topology> data) {
+                if(data.isPresent()) {
+                     final List<Node> nodes = data.get().getNode();
+                     if(nodes != null){
+                        for (final Node node : nodes) {
+                             if (nodeRegex.matcher(node.getNodeId().getValue()).matches()) {
+                                 notifyNode(EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
+                             }
+                         }
+                     }
+                }
+                tx.close();
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                LOG.error("Can not notify existing nodes", t);
+                tx.close();
+            }
+
+        });
+
+    }
+
     private JoinTopicInput getJoinTopicInputArgument(final InstanceIdentifier<?> path) {
         final NodeRef nodeRef = new NodeRef(path);
         final JoinTopicInput jti =
@@ -89,4 +162,46 @@ public class EventSourceTopic implements DataChangeListener {
         return nodeIdPattern;
     }
 
+    private DisJoinTopicInput getDisJoinTopicInputArgument(final InstanceIdentifier<?> eventSourceNodeId){
+        final NodeRef nodeRef = new NodeRef(eventSourceNodeId);
+        final DisJoinTopicInput dji = new DisJoinTopicInputBuilder()
+                .setNode(nodeRef.getValue())
+                .setTopicId(topicId)
+                .build();
+        return dji;
+    }
+
+    private void registerListner(final EventSourceTopology eventSourceTopology) {
+        this.listenerRegistration =
+                eventSourceTopology.getDataBroker().registerDataChangeListener(
+                        LogicalDatastoreType.OPERATIONAL,
+                        EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH,
+                        this,
+                        DataBroker.DataChangeScope.SUBTREE);
+    }
+
+    @Override
+    public void close() {
+        if(this.listenerRegistration != null){
+            this.listenerRegistration.close();
+        }
+        for(final InstanceIdentifier<?> eventSourceNodeId : joinedEventSources){
+            try {
+                final RpcResult<Void> result = sourceService.disJoinTopic(getDisJoinTopicInputArgument(eventSourceNodeId)).get();
+                if(result.isSuccessful() == false){
+                    for(final RpcError err : result.getErrors()){
+                        LOG.error("Can not destroy topic: [{}] on node: [{}]. Error: {}",getTopicId().getValue(),eventSourceNodeId,err.toString());
+                    }
+                }
+            } catch (InterruptedException | ExecutionException ex) {
+                LOG.error("Can not close event source topic / destroy topic {} on node {}.", this.topicId.getValue(), eventSourceNodeId, ex);
+            }
+        }
+        joinedEventSources.clear();
+    }
+
+    private static String getUUIDIdent(){
+        final UUID uuid = UUID.randomUUID();
+        return uuid.toString();
+    }
 }
index 2795289..3aa470b 100644 (file)
@@ -8,18 +8,14 @@
 
 package org.opendaylight.controller.messagebus.app.impl;
 
-import java.util.List;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
-import java.util.regex.Pattern;
-
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.messagebus.spi.EventSource;
 import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
 import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
@@ -32,6 +28,7 @@ import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.even
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1Builder;
@@ -48,7 +45,6 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
@@ -56,11 +52,6 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-
 
 public class EventSourceTopology implements EventAggregatorService, EventSourceRegistry {
     private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class);
@@ -69,7 +60,7 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR
     private static final TopologyKey EVENT_SOURCE_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TOPOLOGY_ID));
     private static final LogicalDatastoreType OPERATIONAL = LogicalDatastoreType.OPERATIONAL;
 
-    private static final InstanceIdentifier<Topology> EVENT_SOURCE_TOPOLOGY_PATH =
+    static final InstanceIdentifier<Topology> EVENT_SOURCE_TOPOLOGY_PATH =
             InstanceIdentifier.create(NetworkTopology.class)
                     .child(Topology.class, EVENT_SOURCE_TOPOLOGY_KEY);
 
@@ -78,8 +69,7 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR
                     .child(TopologyTypes.class)
                     .augmentation(TopologyTypes1.class);
 
-    private final Map<EventSourceTopic, ListenerRegistration<DataChangeListener>> topicListenerRegistrations =
-            new ConcurrentHashMap<>();
+    private final Map<TopicId,EventSourceTopic> eventSourceTopicMap = new ConcurrentHashMap<>();
     private final Map<NodeKey, RoutedRpcRegistration<EventSourceService>> routedRpcRegistrations =
             new ConcurrentHashMap<>();
 
@@ -105,16 +95,39 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR
                                                  final InstanceIdentifier<T> path,
                                                  final T data){
 
-        final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
+        final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
         tx.put(store, path, data, true);
-        tx.submit();
+        Futures.addCallback( tx.submit(), new FutureCallback<Void>(){
+
+            @Override
+            public void onSuccess(final Void result) {
+                LOG.trace("Data has put into datastore {} {}", store, path);
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                LOG.error("Can not put data into datastore [store: {}] [path: {}] [exception: {}]",store,path, t);
+            }
+        });
 
     }
 
     private <T extends DataObject>  void deleteData(final LogicalDatastoreType store, final InstanceIdentifier<T> path){
-        final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
+        final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
         tx.delete(OPERATIONAL, path);
-        tx.submit();
+        Futures.addCallback( tx.submit(), new FutureCallback<Void>(){
+
+            @Override
+            public void onSuccess(final Void result) {
+                LOG.trace("Data has deleted from datastore {} {}", store, path);
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                LOG.error("Can not delete data from datastore [store: {}] [path: {}] [exception: {}]",store,path, t);
+            }
+
+        });
     }
 
     private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath) {
@@ -129,100 +142,56 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR
         deleteData(OPERATIONAL, augmentPath);
     }
 
-    private void notifyExistingNodes(final EventSourceTopic eventSourceTopic){
-
-        final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
-
-        final CheckedFuture<Optional<Topology>, ReadFailedException> future = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH);
-
-        Futures.addCallback(future, new FutureCallback<Optional<Topology>>(){
-
-            @Override
-            public void onSuccess(Optional<Topology> data) {
-                if(data.isPresent()) {
-                    LOG.info("Topology data are present...");
-                     final List<Node> nodes = data.get().getNode();
-                     if(nodes != null){
-                         LOG.info("List of nodes is not null...");
-                         final Pattern nodeIdPatternRegex = eventSourceTopic.getNodeIdRegexPattern();
-                     for (final Node node : nodes) {
-                         if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) {
-                             eventSourceTopic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
-                         }
-                     }
-                     } else {
-                         LOG.info("List of nodes is NULL...");
-                     }
-                }
-                tx.close();
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                LOG.error("Can not notify existing nodes {}", t);
-                tx.close();
-            }
-
-        });
-
-    }
-
     @Override
     public Future<RpcResult<CreateTopicOutput>> createTopic(final CreateTopicInput input) {
-        LOG.info("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
+        LOG.debug("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
                 input.getNotificationPattern(),
                 input.getNodeIdPattern());
 
         final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern());
+        //FIXME: do not use Util.wildcardToRegex - NodeIdPatter should be regex
         final String nodeIdPattern = input.getNodeIdPattern().getValue();
-        final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, nodeIdPattern, eventSourceService);
+        final EventSourceTopic eventSourceTopic = EventSourceTopic.create(notificationPattern, nodeIdPattern, this);
 
-        registerTopic(eventSourceTopic);
-
-        notifyExistingNodes(eventSourceTopic);
+        eventSourceTopicMap.put(eventSourceTopic.getTopicId(), eventSourceTopic);
 
         final CreateTopicOutput cto = new CreateTopicOutputBuilder()
                 .setTopicId(eventSourceTopic.getTopicId())
                 .build();
 
+        LOG.info("Topic has been created: NotificationPattern -> {}, NodeIdPattern -> {}",
+                input.getNotificationPattern(),
+                input.getNodeIdPattern());
+
         return Util.resultRpcSuccessFor(cto);
     }
 
     @Override
     public Future<RpcResult<Void>> destroyTopic(final DestroyTopicInput input) {
-        return Futures.immediateFailedFuture(new UnsupportedOperationException("Not Implemented"));
+        final EventSourceTopic topicToDestroy = eventSourceTopicMap.remove(input.getTopicId());
+        if(topicToDestroy != null){
+            topicToDestroy.close();
+        }
+        return Util.resultRpcSuccessFor((Void) null);
     }
 
     @Override
     public void close() {
         aggregatorRpcReg.close();
-        for(ListenerRegistration<DataChangeListener> reg : topicListenerRegistrations.values()){
-            reg.close();
+        for(final EventSourceTopic est : eventSourceTopicMap.values()){
+            est.close();
         }
     }
 
-    private void registerTopic(final EventSourceTopic listener) {
-        final ListenerRegistration<DataChangeListener> listenerRegistration = dataBroker.registerDataChangeListener(OPERATIONAL,
-                EVENT_SOURCE_TOPOLOGY_PATH,
-                listener,
-                DataBroker.DataChangeScope.SUBTREE);
-
-        topicListenerRegistrations.put(listener, listenerRegistration);
-    }
-
     public void register(final EventSource eventSource){
-        NodeKey nodeKey = eventSource.getSourceNodeKey();
+
+        final NodeKey nodeKey = eventSource.getSourceNodeKey();
         final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
-        RoutedRpcRegistration<EventSourceService> reg = rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, eventSource);
+        final RoutedRpcRegistration<EventSourceService> reg = rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, eventSource);
         reg.registerPath(NodeContext.class, sourcePath);
         routedRpcRegistrations.put(nodeKey,reg);
         insert(sourcePath);
 
-        for(EventSourceTopic est : topicListenerRegistrations.keySet()){
-            if(est.getNodeIdRegexPattern().matcher(nodeKey.getNodeId().getValue()).matches()){
-                est.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey));
-            }
-        }
     }
 
     public void unRegister(final EventSource eventSource){
@@ -236,11 +205,18 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR
     }
 
     @Override
-    public <T extends EventSource> EventSourceRegistration<T> registerEventSource(
-            T eventSource) {
-        EventSourceRegistrationImpl<T> esr = new EventSourceRegistrationImpl<>(eventSource, this);
+    public <T extends EventSource> EventSourceRegistration<T> registerEventSource(final T eventSource) {
+        final EventSourceRegistrationImpl<T> esr = new EventSourceRegistrationImpl<>(eventSource, this);
         register(eventSource);
         return esr;
     }
+
+    DataBroker getDataBroker() {
+        return dataBroker;
+    }
+
+    EventSourceService getEventSourceService() {
+        return eventSourceService;
+    }
 }
 
index d6bcbf2..c70fd3c 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.messagebus.app.impl;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.Future;
 import java.util.regex.Pattern;
 
@@ -23,11 +22,6 @@ import com.google.common.util.concurrent.Futures;
 
 public final class Util {
 
-    public static String getUUIDIdent(){
-        UUID uuid = UUID.randomUUID();
-        return uuid.toString();
-    }
-
     public static <T> Future<RpcResult<T>> resultRpcSuccessFor(final T output) {
         final RpcResult<T> result = RpcResultBuilder.success(output).build();
         return Futures.immediateFuture(result);
index 2cbac7b..61e1af8 100644 (file)
@@ -7,8 +7,8 @@
  */
 package org.opendaylight.controller.messagebus.eventsources.netconf;
 
-import java.net.URI;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 
 import javax.xml.parsers.DocumentBuilder;
@@ -51,40 +51,42 @@ public class ConnectionNotificationTopicRegistration extends NotificationTopicRe
     public ConnectionNotificationTopicRegistration(String SourceName, DOMNotificationListener domNotificationListener) {
         super(NotificationSourceType.ConnectionStatusChange, SourceName, EVENT_SOURCE_STATUS_PATH.getLastComponent().getNamespace().toString());
         this.domNotificationListener = Preconditions.checkNotNull(domNotificationListener);
-        LOG.info("Connection notification source has been initialized...");
+        LOG.info("Connection notification source has been initialized.");
         setActive(true);
         setReplaySupported(false);
     }
 
     @Override
     public void close() throws Exception {
-        LOG.info("Connection notification - publish Deactive");
-        publishNotification(EventSourceStatus.Deactive);
-        notificationTopicMap.clear();
-        setActive(false);
+        if(isActive()){
+            LOG.debug("Connection notification - publish Deactive");
+            publishNotification(EventSourceStatus.Deactive);
+            notificationTopicMap.clear();
+            setActive(false);
+        }
     }
 
     @Override
     void activateNotificationSource() {
-        LOG.info("Connection notification - publish Active");
+        LOG.debug("Connection notification - publish Active");
         publishNotification(EventSourceStatus.Active);
     }
 
     @Override
     void deActivateNotificationSource() {
-        LOG.info("Connection notification - publish Inactive");
+        LOG.debug("Connection notification - publish Inactive");
         publishNotification(EventSourceStatus.Inactive);
     }
 
     @Override
     void reActivateNotificationSource() {
-        LOG.info("Connection notification - reactivate - publish active");
+        LOG.debug("Connection notification - reactivate - publish active");
         publishNotification(EventSourceStatus.Active);
     }
 
     @Override
     boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId) {
-        if(validateNotifactionSchemaPath(notificationPath) == false){
+        if(checkNotificationPath(notificationPath) == false){
             LOG.debug("Bad SchemaPath for notification try to register");
             return false;
         }
@@ -107,16 +109,20 @@ public class ConnectionNotificationTopicRegistration extends NotificationTopicRe
     }
 
     @Override
-    void unRegisterNotificationTopic(TopicId topicId) {
-        // TODO: need code when EventAggregator.destroyTopic will be implemented
-    }
-
-    private boolean validateNotifactionSchemaPath(SchemaPath notificationPath){
-        if(notificationPath == null){
-            return false;
+    synchronized void unRegisterNotificationTopic(TopicId topicId) {
+        List<SchemaPath> notificationPathToRemove = new ArrayList<>();
+        for(SchemaPath notifKey : notificationTopicMap.keySet()){
+            ArrayList<TopicId> topicList = notificationTopicMap.get(notifKey);
+            if(topicList != null){
+                topicList.remove(topicId);
+                if(topicList.isEmpty()){
+                    notificationPathToRemove.add(notifKey);
+                }
+            }
+        }
+        for(SchemaPath notifKey : notificationPathToRemove){
+            notificationTopicMap.remove(notifKey);
         }
-        URI notificationNameSpace = notificationPath.getLastComponent().getNamespace();
-        return getNotificationUrnPrefix().startsWith(notificationNameSpace.toString());
     }
 
     private void publishNotification(EventSourceStatus eventSourceStatus){
index a640064..e4ad387 100644 (file)
@@ -47,6 +47,7 @@ import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.even
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicStatus;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
@@ -111,9 +112,12 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener
         notificationTopicRegistrationList.add(new ConnectionNotificationTopicRegistration(ConnectionNotificationSourceName, this));
         Optional<Map<String, Stream>> streamMap = getAvailableStreams();
         if(streamMap.isPresent()){
+            LOG.debug("Stream configuration compare...");
             for (String urnPrefix : this.urnPrefixToStreamMap.keySet()) {
                 final String streamName = this.urnPrefixToStreamMap.get(urnPrefix);
+                LOG.debug("urnPrefix: {} streamName: {}", urnPrefix, streamName);
                 if(streamMap.get().containsKey(streamName)){
+                    LOG.debug("Stream containig on device");
                     notificationTopicRegistrationList.add(new StreamNotificationTopicRegistration(streamMap.get().get(streamName),urnPrefix, this));
                 }
             }
@@ -127,7 +131,7 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener
         Optional<DataBroker> dataBroker = this.mountPoint.getService(DataBroker.class);
 
         if(dataBroker.isPresent()){
-
+            LOG.debug("GET Available streams ...");
             ReadOnlyTransaction tx = dataBroker.get().newReadOnlyTransaction();
             CheckedFuture<Optional<Streams>, ReadFailedException> checkFeature = tx.read(LogicalDatastoreType.OPERATIONAL,pathStream);
 
@@ -136,6 +140,7 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener
                 if(streams.isPresent()){
                     streamMap = new HashMap<>();
                     for(Stream stream : streams.get().getStream()){
+                        LOG.debug("*** find stream {}", stream.getName().getValue());
                         streamMap.put(stream.getName().getValue(), stream);
                     }
                 }
@@ -143,6 +148,8 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener
                 LOG.warn("Can not read streams for node {}",this.nodeId);
             }
 
+        } else {
+            LOG.warn("No databroker on node {}", this.nodeId);
         }
 
         return Optional.fromNullable(streamMap);
@@ -150,34 +157,49 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener
 
     @Override
     public Future<RpcResult<JoinTopicOutput>> joinTopic(final JoinTopicInput input) {
-
+        LOG.debug("Join topic {} on {}", input.getTopicId().getValue(), this.nodeId);
         final NotificationPattern notificationPattern = input.getNotificationPattern();
         final List<SchemaPath> matchingNotifications = getMatchingNotifications(notificationPattern);
         return registerTopic(input.getTopicId(),matchingNotifications);
 
     }
 
-    private synchronized Future<RpcResult<JoinTopicOutput>> registerTopic(final TopicId topicId, final List<SchemaPath> notificationsToSubscribe){
+    @Override
+    public Future<RpcResult<Void>> disJoinTopic(DisJoinTopicInput input) {
+         for(NotificationTopicRegistration reg : notificationTopicRegistrationList){
+             reg.unRegisterNotificationTopic(input.getTopicId());
+         }
+        return Util.resultRpcSuccessFor((Void) null) ;
+    }
 
+    private synchronized Future<RpcResult<JoinTopicOutput>> registerTopic(final TopicId topicId, final List<SchemaPath> notificationsToSubscribe){
+        LOG.debug("Join topic {} - register");
         JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down;
         if(notificationsToSubscribe != null && notificationsToSubscribe.isEmpty() == false){
+            LOG.debug("Notifications to subscribe has found - count {}",notificationsToSubscribe.size() );
             final Optional<DOMNotificationService> notifyService = getDOMMountPoint().getService(DOMNotificationService.class);
             if(notifyService.isPresent()){
-                int subscribedStreams = 0;
+                int registeredNotificationCount = 0;
                 for(SchemaPath schemaNotification : notificationsToSubscribe){
                    for(NotificationTopicRegistration reg : notificationTopicRegistrationList){
-                      LOG.info("Source of notification {} is activating, TopicId {}", reg.getSourceName(), topicId.getValue() );
-                      reg.activateNotificationSource();
-                      boolean regSuccess = reg.registerNotificationTopic(schemaNotification, topicId);
-                      if(regSuccess){
-                         subscribedStreams = subscribedStreams +1;
-                      }
+                       LOG.debug("Try notification registratio {} on SchemaPathNotification {}", reg.getSourceName(), schemaNotification.getLastComponent().getLocalName());
+                       if(reg.checkNotificationPath(schemaNotification)){
+                           LOG.info("Source of notification {} is activating, TopicId {}", reg.getSourceName(), topicId.getValue() );
+                           boolean regSuccess = reg.registerNotificationTopic(schemaNotification, topicId);
+                           if(regSuccess){
+                              registeredNotificationCount = registeredNotificationCount +1;
+                           }
+                       }
                    }
                 }
-                if(subscribedStreams > 0){
+                if(registeredNotificationCount > 0){
                     joinTopicStatus = JoinTopicStatus.Up;
                 }
+            } else {
+                LOG.warn("NO DOMNotification service on node {}", this.nodeId);
             }
+        } else {
+            LOG.debug("Notifications to subscribe has NOT found");
         }
 
         final JoinTopicOutput output = new JoinTopicOutputBuilder().setStatus(joinTopicStatus).build();
@@ -195,13 +217,12 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener
     public void deActivateStreams(){
         for (NotificationTopicRegistration reg : notificationTopicRegistrationList) {
            LOG.info("Source of notification {} is deactivating on node {}", reg.getSourceName(), this.nodeId);
-            reg.deActivateNotificationSource();
+           reg.deActivateNotificationSource();
         }
     }
 
     @Override
     public void onNotification(final DOMNotification notification) {
-        LOG.info("Notification {} has been arrived...",notification.getType());
         SchemaPath notificationPath = notification.getType();
         Date notificationEventTime = null;
         if(notification instanceof DOMEvent){
@@ -218,7 +239,7 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener
 
                 for(TopicId topicId : topicIdsForNotification){
                     publishNotification(notification, topicId);
-                    LOG.info("Notification {} has been published for TopicId {}",notification.getType(), topicId.getValue());
+                    LOG.debug("Notification {} has been published for TopicId {}",notification.getType(), topicId.getValue());
                 }
 
             }
index 180d3d4..6d19c52 100644 (file)
@@ -107,7 +107,7 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
     @Override
     public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
 
-        LOG.info("[DataChangeEvent<InstanceIdentifier<?>, DataObject>: {}]", event);
+        LOG.debug("[DataChangeEvent<InstanceIdentifier<?>, DataObject>: {}]", event);
         for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getCreatedData().entrySet()) {
             if (changeEntry.getValue() instanceof Node) {
                 nodeCreated(changeEntry.getKey(),(Node) changeEntry.getValue());
index 891887e..2dbda37 100644 (file)
@@ -60,7 +60,7 @@ public class NetconfEventSourceRegistration implements AutoCloseable{
         }
         NetconfEventSourceRegistration nesr = new NetconfEventSourceRegistration(instanceIdent, node, netconfEventSourceManager);
         nesr.updateStatus();
-        LOG.info("NetconfEventSourceRegistration for node {} has been initialized...",node.getNodeId().getValue());
+        LOG.debug("NetconfEventSourceRegistration for node {} has been initialized...",node.getNodeId().getValue());
         return nesr;
     }
 
index 7812bd2..c12b67e 100644 (file)
@@ -11,10 +11,14 @@ import java.util.ArrayList;
 
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public abstract class NotificationTopicRegistration implements AutoCloseable {
 
+    private static final Logger LOG = LoggerFactory.getLogger(NotificationTopicRegistration.class);
+
     public enum NotificationSourceType{
         NetconfDeviceStream,
         ConnectionStatusChange;
@@ -54,6 +58,14 @@ public abstract class NotificationTopicRegistration implements AutoCloseable {
         return notificationUrnPrefix;
     }
 
+    public boolean checkNotificationPath(SchemaPath notificationPath){
+        if(notificationPath == null){
+            return false;
+        }
+        String nameSpace = notificationPath.getLastComponent().toString();
+        LOG.debug("CheckNotification - name space {} - NotificationUrnPrefix {}", nameSpace, getNotificationUrnPrefix());
+        return nameSpace.startsWith(getNotificationUrnPrefix());
+    }
     abstract void activateNotificationSource();
 
     abstract void deActivateNotificationSource();
index e0d4fe2..2e654d0 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.messagebus.eventsources.netconf;
 
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
@@ -58,6 +59,7 @@ public class StreamNotificationTopicRegistration extends NotificationTopicRegist
         this.lastEventTime= null;
         setReplaySupported(this.stream.isReplaySupport());
         setActive(false);
+        LOG.info("StreamNotificationTopicRegistration initialized for {}", getStreamName());
     }
 
     void activateNotificationSource() {
@@ -129,16 +131,26 @@ public class StreamNotificationTopicRegistration extends NotificationTopicRegist
 
     @Override
     boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId){
-        if(validateNotificationPath(notificationPath) == false){
+
+        if(checkNotificationPath(notificationPath) == false){
             LOG.debug("Bad SchemaPath for notification try to register");
             return false;
         }
+
         final Optional<DOMNotificationService> notifyService = domMountPoint.getService(DOMNotificationService.class);
         if(notifyService.isPresent() == false){
             LOG.debug("DOMNotificationService is not present");
             return false;
         }
-        ListenerRegistration<NetconfEventSource> registration = notifyService.get().registerNotificationListener(this.netconfEventSource,notificationPath);
+
+        activateNotificationSource();
+        if(isActive() == false){
+            LOG.warn("Stream {} is not active, listener for notification {} is not registered.", getStreamName(), notificationPath.toString());
+            return false;
+        }
+
+        ListenerRegistration<NetconfEventSource> registration =
+                notifyService.get().registerNotificationListener(this.netconfEventSource,notificationPath);
         notificationRegistrationMap.put(notificationPath, registration);
         ArrayList<TopicId> topicIds = getNotificationTopicIds(notificationPath);
         if(topicIds == null){
@@ -149,16 +161,30 @@ public class StreamNotificationTopicRegistration extends NotificationTopicRegist
                 topicIds.add(topicId);
             }
         }
+
         notificationTopicMap.put(notificationPath, topicIds);
         return true;
     }
 
-    private boolean validateNotificationPath(SchemaPath notificationPath){
-        if(notificationPath == null){
-            return false;
+    @Override
+    synchronized void unRegisterNotificationTopic(TopicId topicId) {
+        List<SchemaPath> notificationPathToRemove = new ArrayList<>();
+        for(SchemaPath notifKey : notificationTopicMap.keySet()){
+            ArrayList<TopicId> topicList = notificationTopicMap.get(notifKey);
+            if(topicList != null){
+                topicList.remove(topicId);
+                if(topicList.isEmpty()){
+                    notificationPathToRemove.add(notifKey);
+                }
+            }
+        }
+        for(SchemaPath notifKey : notificationPathToRemove){
+            notificationTopicMap.remove(notifKey);
+            ListenerRegistration<NetconfEventSource> reg = notificationRegistrationMap.remove(notifKey);
+            if(reg != null){
+                reg.close();
+            }
         }
-        String nameSpace = notificationPath.getLastComponent().toString();
-        return nameSpace.startsWith(getNotificationUrnPrefix());
     }
 
     Optional<Date> getLastEventTime() {
@@ -175,9 +201,4 @@ public class StreamNotificationTopicRegistration extends NotificationTopicRegist
         closeStream();
     }
 
-    @Override
-    void unRegisterNotificationTopic(TopicId topicId) {
-        // TODO: use it when destroy topic will be implemented
-    }
-
 }
index e26502f..fe5e60b 100644 (file)
@@ -28,6 +28,8 @@ import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.osgi.framework.BundleContext;
 
+import com.google.common.util.concurrent.CheckedFuture;
+
 import javax.management.ObjectName;
 
 import static org.junit.Assert.assertEquals;
@@ -101,7 +103,8 @@ public class MessageBusAppImplModuleTest {
         WriteTransaction writeTransactionMock = mock(WriteTransaction.class);
         doReturn(writeTransactionMock).when(dataBrokerMock).newWriteOnlyTransaction();
         doNothing().when(writeTransactionMock).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(DataObject.class), eq(true));
-
+        CheckedFuture checkedFutureMock = mock(CheckedFuture.class);
+        doReturn(checkedFutureMock).when(writeTransactionMock).submit();
         assertNotNull("EventSourceRegistryWrapper has not been created correctly.", messageBusAppImplModule.createInstance());
     }
 
index 9f513c4..0c32cb5 100644 (file)
@@ -9,6 +9,8 @@ package org.opendaylight.controller.messagebus.app.impl;
 
 import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -20,20 +22,30 @@ import java.util.Map;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 
+import com.google.common.util.concurrent.CheckedFuture;
+
 public class EventSourceTopicTest {
 
     EventSourceTopic eventSourceTopic;
-    org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node dataObjectMock;
+    Node dataObjectNodeMock;
     NodeId nodeIdMock;
+    DataBroker dataBrokerMock;
     EventSourceService eventSourceServiceMock;
+    EventSourceTopology eventSourceTopologyMock;
 
     @BeforeClass
     public static void initTestClass() throws IllegalAccessException, InstantiationException {
@@ -43,7 +55,22 @@ public class EventSourceTopicTest {
     public void setUp() throws Exception {
         NotificationPattern notificationPattern = new NotificationPattern("value1");
         eventSourceServiceMock = mock(EventSourceService.class);
-        eventSourceTopic = new EventSourceTopic(notificationPattern, "nodeIdPattern1", eventSourceServiceMock);
+        eventSourceTopologyMock = mock(EventSourceTopology.class);
+        dataBrokerMock = mock(DataBroker.class);
+        doReturn(eventSourceServiceMock).when(eventSourceTopologyMock).getEventSourceService();
+        doReturn(dataBrokerMock).when(eventSourceTopologyMock).getDataBroker();
+
+        WriteTransaction writeTransactionMock = mock(WriteTransaction.class);
+        doReturn(writeTransactionMock).when(dataBrokerMock).newWriteOnlyTransaction();
+        doNothing().when(writeTransactionMock).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(DataObject.class),eq(true));
+        CheckedFuture checkedFutureWriteMock = mock(CheckedFuture.class);
+        doReturn(checkedFutureWriteMock).when(writeTransactionMock).submit();
+
+        ReadOnlyTransaction readOnlyTransactionMock = mock(ReadOnlyTransaction.class);
+        doReturn(readOnlyTransactionMock).when(dataBrokerMock).newReadOnlyTransaction();
+        CheckedFuture checkedFutureReadMock = mock(CheckedFuture.class);
+        doReturn(checkedFutureReadMock).when(readOnlyTransactionMock).read(LogicalDatastoreType.OPERATIONAL, EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH);
+        eventSourceTopic = EventSourceTopic.create(notificationPattern, "nodeIdPattern1", eventSourceTopologyMock);
     }
 
     @Test
@@ -61,19 +88,20 @@ public class EventSourceTopicTest {
         AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
         onDataChangedTestHelper(asyncDataChangeEventMock);
         eventSourceTopic.onDataChanged(asyncDataChangeEventMock);
-        verify(dataObjectMock, times(1)).getId();
-        verify(nodeIdMock, times(1)).getValue();
+        verify(dataObjectNodeMock, times(2)).getNodeId();
+        verify(nodeIdMock, times(2)).getValue();
     }
 
     private void onDataChangedTestHelper(AsyncDataChangeEvent asyncDataChangeEventMock){
         Map<InstanceIdentifier<?>, DataObject> map = new HashMap<>();
         InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class);
-        dataObjectMock = mock(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class);
-        map.put(instanceIdentifierMock, dataObjectMock);
+        dataObjectNodeMock = mock(Node.class);
+        doReturn(getNodeKey("testNodeId01")).when(dataObjectNodeMock).getKey();
+        map.put(instanceIdentifierMock, dataObjectNodeMock);
         doReturn(map).when(asyncDataChangeEventMock).getUpdatedData();
-
+        doReturn(map).when(asyncDataChangeEventMock).getCreatedData();
         nodeIdMock = mock(NodeId.class);
-        doReturn(nodeIdMock).when(dataObjectMock).getId();
+        doReturn(nodeIdMock).when(dataObjectNodeMock).getNodeId();
         doReturn("nodeIdPattern1").when(nodeIdMock).getValue();
     }
 
@@ -84,4 +112,7 @@ public class EventSourceTopicTest {
         verify(eventSourceServiceMock, times(1)).joinTopic(any(JoinTopicInput.class));
     }
 
+    public NodeKey getNodeKey(String nodeId){
+        return new NodeKey(new NodeId(nodeId));
+    }
 }
\ No newline at end of file
index 50ae4d9..68f2a43 100644 (file)
@@ -22,10 +22,8 @@ import java.util.List;
 import java.util.Map;
 
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -35,9 +33,11 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistr
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.Pattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
@@ -62,10 +62,6 @@ public class EventSourceTopologyTest {
     NodeKey nodeKey;
     RpcRegistration<EventAggregatorService> aggregatorRpcReg;
 
-    @BeforeClass
-    public static void initTestClass() throws IllegalAccessException, InstantiationException {
-    }
-
     @Before
     public void setUp() throws Exception {
         dataBrokerMock = mock(DataBroker.class);
@@ -97,6 +93,18 @@ public class EventSourceTopologyTest {
         assertNotNull("Topic has not been created correctly.", eventSourceTopology.createTopic(createTopicInputMock));
     }
 
+    @Test
+    public void destroyTopicTest() throws Exception{
+        topicTestHelper();
+        TopicId topicId = new TopicId("topic-id-007");
+        Map<TopicId,EventSourceTopic> localMap = getEventSourceTopicMap();
+        EventSourceTopic eventSourceTopicMock = mock(EventSourceTopic.class);
+        localMap.put(topicId, eventSourceTopicMock);
+        DestroyTopicInput input = new DestroyTopicInputBuilder().setTopicId(topicId).build();
+        eventSourceTopology.destroyTopic(input);
+        verify(eventSourceTopicMock, times(1)).close();
+    }
+
     private void topicTestHelper() throws Exception{
         constructorTestHelper();
         createTopicInputMock = mock(CreateTopicInput.class);
@@ -134,25 +142,17 @@ public class EventSourceTopologyTest {
         doReturn(nodeId).when(nodeMock).getNodeId();
     }
 
-    @Test
-    public void destroyTopicTest() throws Exception{
-        topicTestHelper();
-        //TODO: modify test when destroyTopic will be implemented
-        DestroyTopicInput destroyTopicInput = null;
-        assertNotNull("Instance has not been created correctly.", eventSourceTopology.destroyTopic(destroyTopicInput));
-    }
-
     @Test
     public void closeTest() throws Exception{
         constructorTestHelper();
         topicTestHelper();
-        Map<DataChangeListener, ListenerRegistration<DataChangeListener>> localMap = getTopicListenerRegistrations();
-        DataChangeListener dataChangeListenerMock = mock(DataChangeListener.class);
-        ListenerRegistration<DataChangeListener> listenerListenerRegistrationMock = (ListenerRegistration<DataChangeListener>) mock(ListenerRegistration.class);
-        localMap.put(dataChangeListenerMock, listenerListenerRegistrationMock);
+        Map<TopicId,EventSourceTopic> localMap = getEventSourceTopicMap();
+        TopicId topicIdMock = mock(TopicId.class);
+        EventSourceTopic eventSourceTopicMock = mock(EventSourceTopic.class);
+        localMap.put(topicIdMock, eventSourceTopicMock);
         eventSourceTopology.close();
         verify(aggregatorRpcReg, times(1)).close();
-        verify(listenerListenerRegistrationMock, times(1)).close();
+        verify(eventSourceTopicMock, times(1)).close();
     }
 
     @Test
@@ -201,8 +201,8 @@ public class EventSourceTopologyTest {
         assertNotNull("Return value has not been created correctly.", eventSourceTopology.registerEventSource(eventSourceMock));
     }
 
-    private Map getTopicListenerRegistrations() throws Exception{
-        Field nesField = EventSourceTopology.class.getDeclaredField("topicListenerRegistrations");
+    private Map getEventSourceTopicMap() throws Exception{
+        Field nesField = EventSourceTopology.class.getDeclaredField("eventSourceTopicMap");
         nesField.setAccessible(true);
         return (Map) nesField.get(eventSourceTopology);
     }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.